diff options
author | d-mokhnatkin <[email protected]> | 2022-08-19 19:42:27 +0300 |
---|---|---|
committer | d-mokhnatkin <[email protected]> | 2022-08-19 19:42:27 +0300 |
commit | 9c9f16353b5f81ac978a1773139cc5769d126135 (patch) | |
tree | 432629b176605ce362739db320c246fe4df415f7 | |
parent | ab9bc5ca72f57e972d0927aae30fcff37f1d5f7b (diff) |
move parser_ut to YQ
52 files changed, 2996 insertions, 1562 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 24d4f633a4c..68b4951f1eb 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -65,6 +65,7 @@ add_subdirectory(contrib/tools/protoc/bin) add_subdirectory(contrib/libs/protoc) add_subdirectory(contrib/tools/protoc/plugins/cpp_styleguide) add_subdirectory(library/cpp/actors/util) +add_subdirectory(library/cpp/pop_count) add_subdirectory(library/cpp/execprofile) add_subdirectory(library/cpp/json/writer) add_subdirectory(library/cpp/json/common) @@ -377,7 +378,6 @@ add_subdirectory(library/cpp/string_utils/url) add_subdirectory(ydb/core/node_whiteboard) add_subdirectory(ydb/core/blobstorage/base) add_subdirectory(ydb/core/blobstorage/groupinfo) -add_subdirectory(library/cpp/pop_count) add_subdirectory(ydb/core/blobstorage/crypto) add_subdirectory(contrib/libs/t1ha) add_subdirectory(library/cpp/sse) @@ -1140,6 +1140,7 @@ add_subdirectory(ydb/library/schlab/mon/static) add_subdirectory(ydb/library/schlab/mon/static/css) add_subdirectory(ydb/library/schlab/mon/static/js) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot) +add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot_fat) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_donor) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_group_reconfiguration) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_osiris) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 070baea7fbb..ea3cf3ed2e2 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -68,6 +68,7 @@ add_subdirectory(contrib/tools/protoc/bin) add_subdirectory(contrib/libs/protoc) add_subdirectory(contrib/tools/protoc/plugins/cpp_styleguide) add_subdirectory(library/cpp/actors/util) +add_subdirectory(library/cpp/pop_count) add_subdirectory(library/cpp/execprofile) add_subdirectory(library/cpp/json/writer) add_subdirectory(library/cpp/json/common) @@ -380,7 +381,6 @@ add_subdirectory(library/cpp/string_utils/url) add_subdirectory(ydb/core/node_whiteboard) add_subdirectory(ydb/core/blobstorage/base) add_subdirectory(ydb/core/blobstorage/groupinfo) -add_subdirectory(library/cpp/pop_count) add_subdirectory(ydb/core/blobstorage/crypto) add_subdirectory(contrib/libs/t1ha) add_subdirectory(library/cpp/sse) @@ -1144,6 +1144,7 @@ add_subdirectory(ydb/library/schlab/mon/static) add_subdirectory(ydb/library/schlab/mon/static/css) add_subdirectory(ydb/library/schlab/mon/static/js) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot) +add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot_fat) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_donor) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_group_reconfiguration) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_osiris) diff --git a/contrib/libs/gflags/include/gflags/gflags.h b/contrib/libs/gflags/include/gflags/gflags.h index 33d8cad2588..caadfd4aeda 100644 --- a/contrib/libs/gflags/include/gflags/gflags.h +++ b/contrib/libs/gflags/include/gflags/gflags.h @@ -223,7 +223,7 @@ extern GFLAGS_DLL_DECL bool GetCommandLineFlagInfo(const char* name, CommandLine // if (GetCommandLineFlagInfoOrDie("foo").is_default) ... extern GFLAGS_DLL_DECL CommandLineFlagInfo GetCommandLineFlagInfoOrDie(const char* name); -enum GFLAGS_DLL_DECL FlagSettingMode { +enum FlagSettingMode { // update the flag's value (can call this multiple times). SET_FLAGS_VALUE, // update the flag's value, but *only if* it has not yet been updated diff --git a/library/cpp/actors/util/CMakeLists.txt b/library/cpp/actors/util/CMakeLists.txt index 40d958d75e4..233e1fe0fc4 100644 --- a/library/cpp/actors/util/CMakeLists.txt +++ b/library/cpp/actors/util/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(cpp-actors-util PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic + library-cpp-pop_count ) target_sources(cpp-actors-util PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp diff --git a/library/cpp/actors/util/cpu_load_log.h b/library/cpp/actors/util/cpu_load_log.h new file mode 100644 index 00000000000..e4ae6122465 --- /dev/null +++ b/library/cpp/actors/util/cpu_load_log.h @@ -0,0 +1,227 @@ +#pragma once + +#include "defs.h" +#include <library/cpp/deprecated/atomic/atomic.h> +#include <library/cpp/pop_count/popcount.h> + +static constexpr ui64 BitDurationNs = 131'072; // A power of 2 + +template <ui64 DataSize> +struct TCpuLoadLog { + static constexpr ui64 BitsSize = DataSize * 64; + TAtomic LastTimeNs = 0; + ui64 Data[DataSize]; + + TCpuLoadLog() { + LastTimeNs = 0; + for (size_t i = 0; i < DataSize; ++i) { + Data[i] = 0; + } + } + + TCpuLoadLog(ui64 timeNs) { + LastTimeNs = timeNs; + for (size_t i = 0; i < DataSize; ++i) { + Data[i] = 0; + } + } + + void RegisterBusyPeriod(ui64 timeNs) { + RegisterBusyPeriod<true>(timeNs, AtomicGet(LastTimeNs)); + } + + template <bool ModifyLastTime> + void RegisterBusyPeriod(ui64 timeNs, ui64 lastTimeNs) { + timeNs |= 1ull; + if (timeNs < lastTimeNs) { + for (ui64 i = 0; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + return; + } + const ui64 lastIdx = timeNs / BitDurationNs; + const ui64 curIdx = lastTimeNs / BitDurationNs; + ui64 firstElementIdx = curIdx / 64; + const ui64 firstBitIdx = curIdx % 64; + const ui64 lastElementIdx = lastIdx / 64; + const ui64 lastBitIdx = lastIdx % 64; + if (firstElementIdx == lastElementIdx) { + ui64 prevValue = 0; + if (firstBitIdx != 0) { + prevValue = AtomicGet(Data[firstElementIdx % DataSize]); + } + const ui64 bits = (((~0ull) << (firstBitIdx + (63-lastBitIdx))) >> (63-lastBitIdx)); + const ui64 newValue = prevValue | bits; + AtomicSet(Data[firstElementIdx % DataSize], newValue); + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + return; + } + // process the first element + ui64 prevValue = 0; + if (firstBitIdx != 0) { + prevValue = AtomicGet(Data[firstElementIdx % DataSize]); + } + const ui64 bits = ((~0ull) << firstBitIdx); + const ui64 newValue = (prevValue | bits); + AtomicSet(Data[firstElementIdx % DataSize], newValue); + ++firstElementIdx; + // process the fully filled elements + const ui64 firstLoop = firstElementIdx / DataSize; + const ui64 lastLoop = lastElementIdx / DataSize; + const ui64 lastOffset = lastElementIdx % DataSize; + if (firstLoop < lastLoop) { + for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + for (ui64 i = 0; i < lastOffset; ++i) { + AtomicSet(Data[i], ~0ull); + } + } else { + for (ui64 i = firstElementIdx % DataSize; i < lastOffset; ++i) { + AtomicSet(Data[i], ~0ull); + } + } + // process the last element + const ui64 newValue2 = ((~0ull) >> (63-lastBitIdx)); + AtomicSet(Data[lastOffset], newValue2); + if (ModifyLastTime) { + AtomicSet(LastTimeNs, timeNs); + } + } + + void RegisterIdlePeriod(ui64 timeNs) { + timeNs &= ~1ull; + ui64 lastTimeNs = AtomicGet(LastTimeNs); + if (timeNs < lastTimeNs) { + // Fast check first, slower chec later + if ((timeNs | 1ull) < lastTimeNs) { + // Time goes back, dont panic, just mark the whole array 'busy' + for (ui64 i = 0; i < DataSize; ++i) { + AtomicSet(Data[i], ~0ull); + } + AtomicSet(LastTimeNs, timeNs); + return; + } + } + const ui64 curIdx = lastTimeNs / BitDurationNs; + const ui64 lastIdx = timeNs / BitDurationNs; + ui64 firstElementIdx = curIdx / 64; + const ui64 lastElementIdx = lastIdx / 64; + if (firstElementIdx >= lastElementIdx) { + AtomicSet(LastTimeNs, timeNs); + return; + } + // process the first partially filled element + ++firstElementIdx; + // process all other elements + const ui64 firstLoop = firstElementIdx / DataSize; + const ui64 lastLoop = lastElementIdx / DataSize; + const ui64 lastOffset = lastElementIdx % DataSize; + if (firstLoop < lastLoop) { + for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) { + AtomicSet(Data[i], 0); + } + for (ui64 i = 0; i <= lastOffset; ++i) { + AtomicSet(Data[i], 0); + } + } else { + for (ui64 i = firstElementIdx % DataSize; i <= lastOffset; ++i) { + AtomicSet(Data[i], 0); + } + } + AtomicSet(LastTimeNs, timeNs); + } +}; + +template <ui64 DataSize> +struct TMinusOneCpuEstimator { + static constexpr ui64 BitsSize = DataSize * 64; + ui64 BeginDelayIdx; + ui64 EndDelayIdx; + ui64 Idle; + ui64 Delay[BitsSize]; + + ui64 MaxLatencyIncreaseWithOneLessCpu(TCpuLoadLog<DataSize>** logs, i64 logCount, ui64 timeNs, ui64 periodNs) { + Y_VERIFY(logCount > 0); + ui64 endTimeNs = timeNs; + + ui64 lastTimeNs = timeNs; + for (i64 log_idx = 0; log_idx < logCount; ++log_idx) { + ui64 x = AtomicGet(logs[log_idx]->LastTimeNs); + if ((x & 1) == 1) { + lastTimeNs = Min(lastTimeNs, x); + } else { + logs[log_idx]->template RegisterBusyPeriod<false>(endTimeNs, x); + } + } + const ui64 beginTimeNs = periodNs < timeNs ? timeNs - periodNs : 0; + + ui64 beginIdx = beginTimeNs / BitDurationNs; + ui64 lastIdx = lastTimeNs / BitDurationNs; + ui64 beginElementIdx = beginIdx / 64; + ui64 lastElementIdx = lastIdx / 64; + + BeginDelayIdx = 0; + EndDelayIdx = 0; + Idle = 0; + ui64 maxDelay = 0; + ui64 bucket = 0; + for (ui64 idx = beginElementIdx; idx <= lastElementIdx; ++idx) { + ui64 i = idx % DataSize; + ui64 input = AtomicGet(logs[0]->Data[i]); + ui64 all_busy = ~0ull; + for (i64 log_idx = 1; log_idx < logCount; ++log_idx) { + ui64 x = AtomicGet(logs[log_idx]->Data[i]); + all_busy &= x; + } + if (!input) { + if (!bucket) { + Idle += 64 - PopCount(all_busy); + continue; + } + } + for (i64 bit_idx = 0; bit_idx < 64; ++bit_idx) { + ui64 x = (1ull << bit_idx); + if (all_busy & x) { + if (input & x) { + // Push into the queue + bucket++; + Delay[EndDelayIdx] = EndDelayIdx; + ++EndDelayIdx; + } else { + // All busy + } + } else { + if (input & x) { + // Move success + } else { + if (bucket) { + // Remove from the queue + bucket--; + ui64 stored = Delay[BeginDelayIdx]; + ++BeginDelayIdx; + ui64 delay = EndDelayIdx - stored; + maxDelay = Max(maxDelay, delay); + //Cerr << "bit_idx: " << bit_idx << " stored: " << stored << " delay: " << delay << Endl; + } else { + Idle++; + } + } + } + } + } + if (bucket) { + ui64 stored = Delay[BeginDelayIdx]; + ui64 delay = EndDelayIdx - stored; + maxDelay = Max(maxDelay, delay); + //Cerr << "last stored: " << stored << " delay: " << delay << Endl; + } + return maxDelay * BitDurationNs; + } +}; + diff --git a/library/cpp/actors/util/cpu_load_log_ut.cpp b/library/cpp/actors/util/cpu_load_log_ut.cpp new file mode 100644 index 00000000000..7109123c6e6 --- /dev/null +++ b/library/cpp/actors/util/cpu_load_log_ut.cpp @@ -0,0 +1,275 @@ +#include "cpu_load_log.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <util/random/random.h> +#include <util/system/hp_timer.h> +#include <util/system/sanitizers.h> +#include <util/system/thread.h> + +Y_UNIT_TEST_SUITE(CpuLoadLog) { + + TString PrintBits(ui64 x) { + TStringStream str; + for (ui64 i = 0; i < 64; ++i) { + if (x & (1ull << i)) { + str << "1"; + } else { + str << "0"; + } + } + return str.Str(); + } + + Y_UNIT_TEST(FillAll) { + TCpuLoadLog<5> log(100*BitDurationNs); + log.RegisterBusyPeriod(101*BitDurationNs); + log.RegisterBusyPeriod(163*BitDurationNs); + log.RegisterBusyPeriod(164*BitDurationNs); + log.RegisterBusyPeriod(165*BitDurationNs); + log.RegisterBusyPeriod(331*BitDurationNs); + log.RegisterBusyPeriod(340*BitDurationNs); + log.RegisterBusyPeriod(420*BitDurationNs); + log.RegisterBusyPeriod(511*BitDurationNs); + //for (ui64 i = 0; i < 5; ++i) { + // Cerr << "i: " << i << " bits: " << PrintBits(log.Data[i]) << Endl; + //} + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_C((ui64(log.Data[i]) == ~ui64(0)), "Unequal at " << i << "\n got: " << PrintBits(log.Data[i]) + << "\n expected: " << PrintBits(~ui64(0))); + } + } + + Y_UNIT_TEST(PartialFill) { + TCpuLoadLog<5> log(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b0ull)); + log.RegisterBusyPeriod(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(0*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs/2); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b11ull)); + log.RegisterIdlePeriod(3*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b11ull)); + log.RegisterBusyPeriod(3*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0b1011ull)); + log.RegisterBusyPeriod(63*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits((~0ull)^0b0100ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b0ull)); + log.RegisterBusyPeriod(128*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits((~0ull)^0b0100ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0b1ull)); + log.RegisterBusyPeriod(1*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(2*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod(128*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod(192*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterBusyPeriod(192*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(~0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(~0ull)); + log.RegisterIdlePeriod((192+5*64-1)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0b1ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(0ull)); + log.RegisterIdlePeriod((192+15*64)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[0]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[1]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[2]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[3]), PrintBits(0ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log.Data[4]), PrintBits(0ull)); + } + + Y_UNIT_TEST(Estimator) { + TCpuLoadLog<5> *log[10]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + + for (ui64 i = 0; i < 5*64; i+=2) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod(i*BitDurationNs); + } + log[0]->RegisterIdlePeriod((5*64-2)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[0]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[4]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + for (ui64 i = 0; i < 5*64-1; i+=2) { + log[1]->RegisterIdlePeriod((i+1)*BitDurationNs); + log[1]->RegisterBusyPeriod((i+1)*BitDurationNs); + } + log[1]->RegisterIdlePeriod((5*64-2+1)*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[0]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[4]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64)*BitDurationNs-1, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 1); + + value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64+10)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 12); + + delete log[0]; + delete log[1]; + } + + Y_UNIT_TEST(Estimator2) { + TCpuLoadLog<5> *log[2]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + for (ui64 i = 0; i < 5*64; i+=2) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod(i*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[0]->Data[i]), + PrintBits(0b0101010101010101010101010101010101010101010101010101010101010101ull)); + } + for (ui64 i = 0; i < 5*64-1; i+=2) { + log[1]->RegisterIdlePeriod((i+1)*BitDurationNs); + log[1]->RegisterBusyPeriod((i+1)*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + UNIT_ASSERT_VALUES_EQUAL(PrintBits(log[1]->Data[i]), + PrintBits(0b1010101010101010101010101010101010101010101010101010101010101010ull)); + } + + log[0]->Data[2] = ~0ull; + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 2, (5*64-1)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 32); + + delete log[0]; + delete log[1]; + } + + Y_UNIT_TEST(Estimator3) { + TCpuLoadLog<5> *log[3]; + log[0] = new TCpuLoadLog<5>(0*BitDurationNs); + log[1] = new TCpuLoadLog<5>(0*BitDurationNs); + log[2] = new TCpuLoadLog<5>(0*BitDurationNs); + TMinusOneCpuEstimator<5> estimator; + + for (ui64 i = 0; i < 5*64; i+=8) { + log[0]->RegisterIdlePeriod(i*BitDurationNs); + log[0]->RegisterBusyPeriod((i+3)*BitDurationNs); + log[1]->RegisterIdlePeriod(i*BitDurationNs); + log[1]->RegisterBusyPeriod((i+3)*BitDurationNs); + log[2]->RegisterIdlePeriod(i*BitDurationNs); + log[2]->RegisterBusyPeriod((i+3)*BitDurationNs); + } + for (ui64 i = 0; i < 5; ++i) { + for (ui64 n = 0; n < 3; ++n) { + UNIT_ASSERT_VALUES_EQUAL_C(PrintBits(log[n]->Data[i]), + PrintBits(0b0000111100001111000011110000111100001111000011110000111100001111ull), + " i: " << i << " n: " << n); + } + } + + ui64 value = estimator.MaxLatencyIncreaseWithOneLessCpu(log, 3, (5*64-5)*BitDurationNs, 3*64*BitDurationNs); + UNIT_ASSERT_VALUES_EQUAL(value/BitDurationNs, 4); + + delete log[0]; + delete log[1]; + delete log[2]; + } + /* + class TWorkerThread : public ISimpleThread { + private: + std::function<void()> Func; + double Time = 0.0; + + public: + TWorkerThread(std::function<void()> func) + : Func(std::move(func)) + { } + + double GetTime() const { + return Time; + } + + static THolder<TWorkerThread> Spawn(std::function<void()> func) { + THolder<TWorkerThread> thread = MakeHolder<TWorkerThread>(std::move(func)); + thread->Start(); + return thread; + } + + private: + void* ThreadProc() noexcept override { + THPTimer timer; + Func(); + Time = timer.Passed(); + return nullptr; + } + }; + + void DoConcurrentPushPop(size_t threads, ui64 perThreadCount) { + // Concurrency factor 4 is up to 16 threads + + auto workerFunc = [&](size_t threadIndex) { + }; + + TVector<THolder<TWorkerThread>> workers(threads); + for (size_t i = 0; i < threads; ++i) { + workers[i] = TWorkerThread::Spawn([workerFunc, i]() { + workerFunc(i); + }); + } + + double maxTime = 0; + for (size_t i = 0; i < threads; ++i) { + workers[i]->Join(); + maxTime = Max(maxTime, workers[i]->GetTime()); + } + + UNIT_ASSERT_VALUES_EQUAL(popped, 0u); + + Cerr << "Concurrent with " << threads << " threads: " << maxTime << " seconds" << Endl; + } + + void DoConcurrentPushPop_3times(size_t threads, ui64 perThreadCount) { + for (size_t i = 0; i < 3; ++i) { + DoConcurrentPushPop(threads, perThreadCount); + } + } + + static constexpr ui64 PER_THREAD_COUNT = NSan::PlainOrUnderSanitizer(1000000, 100000); + + Y_UNIT_TEST(ConcurrentPushPop_1thread) { DoConcurrentPushPop_3times(1, PER_THREAD_COUNT); } + */ +} diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index 052009406f2..937e44ec9b9 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -892,7 +892,7 @@ void TPDisk::SendChunkWriteError(TChunkWrite &chunkWrite, const TString &errorRe //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void TPDisk::SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringStream& error, NKikimrProto::EReplyStatus status) { - error << " for owner# " << read->Owner << " can't read chunkIdx# " << read->ChunkIdx; + error << " for ownerId# " << read->Owner << " can't read chunkIdx# " << read->ChunkIdx; Y_VERIFY(status != NKikimrProto::OK); LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_PDISK, error.Str()); @@ -1659,7 +1659,7 @@ void TPDisk::ForceDeleteChunk(TChunkIdx chunkIdx) { switch (state.CommitState) { case TChunkState::DATA_ON_QUARANTINE: LOG_NOTICE_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId - << " chunkIdx# " << chunkIdx << " owned by owner# " << state.OwnerId + << " chunkIdx# " << chunkIdx << " owned by ownerId# " << state.OwnerId << " is released from quarantine and marked as free at ForceDeleteChunk"); [[fallthrough]]; case TChunkState::DATA_RESERVED: @@ -1700,7 +1700,15 @@ void TPDisk::KillOwner(TOwner owner, TOwnerRound killOwnerRound, TCompletionEven " Line# %" PRIu32 " --CommitedDataChunks# %" PRIi64 " chunkIdx# %" PRIu32 " Marker# BPD84", (ui32)PDiskId, (ui32)__LINE__, (i64)Mon.CommitedDataChunks->Val(), (ui32)i); } - if (state.HasAnyOperationsInProgress() + if (state.CommitState == TChunkState::DATA_ON_QUARANTINE) { + if (!pushedOwnerIntoQuarantine) { + pushedOwnerIntoQuarantine = true; + QuarantineOwners.push_back(owner); + LOG_NOTICE_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId + << " push ownerId# " << owner + << " into quarantine as there is a chunk in DATA_ON_QUARANTINE"); + } + } else if (state.HasAnyOperationsInProgress() || state.CommitState == TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS || state.CommitState == TChunkState::DATA_COMMITTED_DELETE_IN_PROGRESS || state.CommitState == TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE @@ -1715,7 +1723,7 @@ void TPDisk::KillOwner(TOwner owner, TOwnerRound killOwnerRound, TCompletionEven } if (state.CommitState != TChunkState::DATA_RESERVED_DELETE_ON_QUARANTINE - && state.CommitState != TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE) { + && state.CommitState != TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS) { QuarantineChunks.push_back(i); } @@ -1985,7 +1993,7 @@ void TPDisk::ClearQuarantineChunks() { for (auto delIt = it; delIt != QuarantineOwners.end(); ++delIt) { Keeper.RemoveOwner(*delIt); LOG_NOTICE_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId - << " removed owner# " << *delIt << " from chunks Keeper through QuarantineOwners"); + << " removed ownerId# " << *delIt << " from chunks Keeper through QuarantineOwners"); } QuarantineOwners.erase(it, QuarantineOwners.end()); *Mon.QuarantineOwners = QuarantineOwners.size(); @@ -2309,7 +2317,7 @@ void TPDisk::PrepareLogError(TLogWrite *logWrite, TStringStream& err, NKikimrPro return; } - err << " error in TLogWrite for owner# " << logWrite->Owner << " ownerRound# " << logWrite->OwnerRound + err << " error in TLogWrite for ownerId# " << logWrite->Owner << " ownerRound# " << logWrite->OwnerRound << " lsn# " << logWrite->Lsn; LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_PDISK, err.Str()); @@ -2350,7 +2358,7 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) { NKikimrProto::EReplyStatus errStatus = CheckOwnerAndRound(request, err); LOG_TRACE_S(*ActorSystem, NKikimrServices::BS_PDISK, "PreprocessRequest " << TypeName(*request) - << " from owner# " << request->Owner << " round# " << request->OwnerRound + << " from ownerId# " << request->Owner << " round# " << request->OwnerRound << " errStatus# " << errStatus); switch (request->GetType()) { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp index af7720426b3..5db5cae52b7 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp @@ -882,7 +882,9 @@ NKikimrProto::EReplyStatus TPDisk::BeforeLoggingCommitRecord(const TLogWrite &lo state.CommitState = TChunkState::DATA_COMMITTED_DELETE_ON_QUARANTINE; break; default: - state.CommitState = TChunkState::DATA_ON_QUARANTINE; + Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx + << " request ownerId# " << logWrite.Owner + << " with operations in progress as it is in unexpected CommitState# " << state.ToString()); break; } QuarantineChunks.push_back(chunkIdx); @@ -1007,7 +1009,6 @@ void TPDisk::DeleteChunk(ui32 chunkIdx, TOwner owner) { TChunkState &state = ChunkState[chunkIdx]; switch (state.CommitState) { // Chunk will be freed in TPDisk::ForceDeleteChunk() and may be released already - case TChunkState::FREE: case TChunkState::DATA_ON_QUARANTINE: break; case TChunkState::DATA_RESERVED_DELETE_IN_PROGRESS: @@ -1035,7 +1036,7 @@ void TPDisk::DeleteChunk(ui32 chunkIdx, TOwner owner) { break; default: Y_FAIL_S("PDiskID# " << PDiskId << " can't delete chunkIdx# " << chunkIdx - << " requesting ownerId# " << owner + << " requesting ownerId# " << owner << " as it is in unexpected CommitState# " << state.ToString()); } } diff --git a/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp b/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp index f5f9ae6d014..ca18c75fa58 100644 --- a/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp @@ -1,1097 +1,91 @@ #include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> #include <ydb/core/blob_depot/events.h> -#include <util/random/mersenne.h> -#include <util/random/random.h> +#include "blob_depot_event_managers.h" +#include "blob_depot_auxiliary_structures.h" +#include "blob_depot_test_functions.h" -#include <algorithm> -#include <random> - -#include <blob_depot_event_managers.h> -#include <blob_depot_auxiliary_structures.h> +#include <util/random/entropy.h> using namespace NKikimr::NBlobDepot; Y_UNIT_TEST_SUITE(BlobDepot) { - TMersenne<ui32> mt(13371); - TMersenne<ui64> mt64(0xdeadf00d); - const ui64 BLOB_DEPOT_TABLET_ID = MakeTabletID(1, 0, 0x10000); - - void ConfigureEnvironment(ui32 numGroups, std::unique_ptr<TEnvironmentSetup>& envPtr, std::vector<ui32>& regularGroups, ui32& blobDepot, ui32 nodeCount = 8, - TBlobStorageGroupType erasure = TBlobStorageGroupType::ErasureMirror3of4) { - envPtr = std::make_unique<TEnvironmentSetup>(TEnvironmentSetup::TSettings{ - .NodeCount = nodeCount, - .Erasure = erasure, - .SetupHive = true, - }); - - envPtr->CreateBoxAndPool(1, numGroups); - envPtr->Sim(TDuration::Seconds(20)); - - regularGroups = envPtr->GetGroups(); - - NKikimrBlobStorage::TConfigRequest request; - TString virtualPool = "virtual_pool"; - { - auto *cmd = request.AddCommand()->MutableDefineStoragePool(); - cmd->SetBoxId(1); - cmd->SetName(virtualPool); - cmd->SetErasureSpecies("none"); - cmd->SetVDiskKind("Default"); - } - { - auto *cmd = request.AddCommand()->MutableAllocateVirtualGroup(); - cmd->SetName("vg"); - cmd->SetHiveId(envPtr->Runtime->GetAppData()->DomainsInfo->HivesByHiveUid.begin()->second); - cmd->SetStoragePoolName(virtualPool); - auto *prof = cmd->AddChannelProfiles(); - prof->SetStoragePoolName(envPtr->StoragePoolName); - prof->SetCount(2); - prof = cmd->AddChannelProfiles(); - prof->SetStoragePoolName(envPtr->StoragePoolName); - prof->SetChannelKind(NKikimrBlobDepot::TChannelKind::Data); - prof->SetCount(2); - } - - auto response = envPtr->Invoke(request); - UNIT_ASSERT_C(response.GetSuccess(), response.GetErrorDescription()); - blobDepot = response.GetStatus(1).GetGroupId(0); - - envPtr->Sim(TDuration::Seconds(5)); // some time for blob depot to crank up - } - - void DecommitGroup(TEnvironmentSetup& env, ui32 groupId) { - TString blobDepotPool = "decommit_blob_depot_pool"; - ui32 blobDepotPoolId = 42; - env.CreatePoolInBox(1, blobDepotPoolId, blobDepotPool); - NKikimrBlobStorage::TConfigRequest request; - - auto *cmd = request.AddCommand()->MutableDecommitGroups(); - cmd->AddGroupIds(groupId); - cmd->SetHiveId(env.Runtime->GetAppData()->DomainsInfo->HivesByHiveUid.begin()->second); - auto *prof = cmd->AddChannelProfiles(); - prof->SetStoragePoolName(blobDepotPool); - prof->SetCount(2); - prof = cmd->AddChannelProfiles(); - prof->SetStoragePoolName(blobDepotPool); - prof->SetChannelKind(NKikimrBlobDepot::TChannelKind::Data); - prof->SetCount(2); - - auto response = env.Invoke(request); - UNIT_ASSERT_C(response.GetSuccess(), response.GetErrorDescription()); - } - - TString DataGen(ui32 len) { - TString res = ""; - for (ui32 i = 0; i < len; ++i) { - res += 'A' + mt.GenRand() % ('z' - 'A'); - } - return res; - } - - ui32 Rand(ui32 a, ui32 b) { - if (a >= b) { - return a; - } - return mt.GenRand() % (b - a) + a; - } - - ui32 Rand(ui32 b) { - return Rand(0, b); - } - - ui32 Rand() { - return mt.GenRand(); - } - - ui32 Rand64() { - return mt64.GenRand(); - } - - template <class T> - T& Rand(std::vector<T>& v) { - return v[Rand(v.size())]; - } - - ui32 SeedRand(ui32 a, ui32 b, ui32 seed) { - TMersenne<ui32> temp(seed); - if (a >= b) { - return a; - } - return temp.GenRand() % (b - a) + a; - } - - ui32 SeedRand(ui32 b, ui32 seed) { - return SeedRand(0, b, seed); - } - - template <class T> - const T& Rand(const std::vector<T>& v) { - return v[Rand(v.size())]; - } - - void TestBasicPutAndGet(TEnvironmentSetup& env, ui64 tabletId, ui32 groupId) { - std::vector<TBlobInfo> blobs; - TBSState state; - state[tabletId]; - - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1)); - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 2)); - blobs.push_back(TBlobInfo(DataGen(200), tabletId, 1)); - - VerifiedGet(env, 1, groupId, blobs[0], true, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[1], true, false, 0, state); - - VerifiedPut(env, 1, groupId, blobs[0], state); - VerifiedPut(env, 1, groupId, blobs[1], state); - - VerifiedGet(env, 1, groupId, blobs[0], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[1], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[2], false, false, 0, state); - - VerifiedGet(env, 1, groupId, blobs, false, false, 0, state); - - blobs.push_back(TBlobInfo(DataGen(1000), tabletId + (1 << 12), 1)); - VerifiedPut(env, 1, groupId, blobs[2], state); - VerifiedPut(env, 1, groupId, blobs[3], state); - - VerifiedGet(env, 1, groupId, blobs[2], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[3], false, false, 0, state); - } - - TLogoBlobID MinBlobID(ui64 tablet) { - return TLogoBlobID(tablet, 0, 0, 0, 0, 0); - } - - TLogoBlobID MaxBlobID(ui64 tablet) { - return TLogoBlobID(tablet, Max<ui32>(), Max<ui32>(), NKikimr::TLogoBlobID::MaxChannel, - NKikimr::TLogoBlobID::MaxBlobSize, NKikimr::TLogoBlobID::MaxCookie, NKikimr::TLogoBlobID::MaxPartId, - NKikimr::TLogoBlobID::MaxCrcMode); - } - - void TestBasicRange(TEnvironmentSetup& env, ui64 tabletId, ui32 groupId) { - std::vector<TBlobInfo> blobs; - TBSState state; - state[tabletId]; - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1)); - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 2)); - blobs.push_back(TBlobInfo(DataGen(200), tabletId, 1)); - - VerifiedPut(env, 1, groupId, blobs[0], state); - VerifiedPut(env, 1, groupId, blobs[1], state); - - VerifiedRange(env, 1, groupId, tabletId, MinBlobID(tabletId), MaxBlobID(tabletId), false, false, blobs, state); - VerifiedRange(env, 1, groupId, tabletId, MinBlobID(tabletId), MaxBlobID(tabletId), false, true, blobs, state); - - ui32 n = 100; - for (ui32 i = 0; i < n; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1000 + i)); - if (i % 2) { - VerifiedPut(env, 1, groupId, blobs[i], state); - } - } - - VerifiedRange(env, 1, groupId, tabletId, blobs[0].Id, blobs[n/2 - 1].Id, false, false, blobs, state); - VerifiedRange(env, 1, groupId, tabletId, blobs[0].Id, blobs[n/2 - 1].Id, false, true, blobs, state); - } - - void TestBasicDiscover(TEnvironmentSetup& env, ui64 tabletId, ui32 groupId) { - - std::vector<TBlobInfo> blobs; - ui64 tablet2 = tabletId + 1000; - TBSState state; - state[tabletId]; - state[tablet2]; - - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, 2)); - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, 3)); - blobs.push_back(TBlobInfo(DataGen(200), tabletId, 1, 4)); - blobs.push_back(TBlobInfo(DataGen(200), tabletId, 2, 4, 1)); - - VerifiedDiscover(env, 1, groupId, tabletId, 0, false, false, 0, true, blobs, state); - VerifiedDiscover(env, 1, groupId, tabletId, 1, false, false, 0, true, blobs, state); - VerifiedDiscover(env, 1, groupId, tabletId, 0, true, false, 0, true, blobs, state); - - VerifiedPut(env, 1, groupId, blobs[0], state); - VerifiedPut(env, 1, groupId, blobs[1], state); - - VerifiedDiscover(env, 1, groupId, tabletId, 0, false, false, 0, true, blobs, state); - VerifiedDiscover(env, 1, groupId, tabletId, 0, true, false, 0, true, blobs, state); - VerifiedDiscover(env, 1, groupId, tabletId, 0, true, false, 0, true, blobs, state); - - VerifiedDiscover(env, 1, groupId, tabletId, 100, true, false, 0, true, blobs, state); - - blobs.push_back(TBlobInfo(DataGen(1000), tablet2, 10, 2)); - VerifiedDiscover(env, 1, groupId, tablet2, 0, true, false, 0, true, blobs, state); - - VerifiedPut(env, 1, groupId, blobs[3], state); - VerifiedDiscover(env, 1, groupId, tablet2, 0, false, false, 0, true, blobs, state); - - VerifiedDiscover(env, 1, groupId, tablet2, 42, true, false, 0, true, blobs, state); - } - - void TestBasicBlock(TEnvironmentSetup& env, ui64 tabletId, ui32 groupId) { - ui32 tablet2 = tabletId + 1; - std::vector<TBlobInfo> blobs; - TBSState state; - state[tabletId]; - state[tablet2]; - - ui32 lastGen = 0; - - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, lastGen++)); - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, lastGen++)); - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, lastGen++)); - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, lastGen++)); - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, lastGen++)); // blobs[4] - - ui32 lastGen2 = 1; - blobs.push_back(TBlobInfo(DataGen(100), tablet2, 1, lastGen2++, 1)); - blobs.push_back(TBlobInfo(DataGen(100), tablet2, 2, lastGen2++, 2)); - blobs.push_back(TBlobInfo(DataGen(100), tablet2, 3, lastGen2++, 3)); - - VerifiedPut(env, 1, groupId, blobs[2], state); - - VerifiedBlock(env, 1, groupId, tabletId, 3, state); - - VerifiedPut(env, 1, groupId, blobs[1], state); - VerifiedPut(env, 1, groupId, blobs[3], state); - VerifiedGet(env, 1, groupId, blobs[3], false, false, 0, state); - - VerifiedPut(env, 1, groupId, blobs[4], state); - VerifiedGet(env, 1, groupId, blobs[4], false, false, 0, state); - - VerifiedBlock(env, 1, groupId, tabletId, 2, state); - VerifiedBlock(env, 1, groupId, tabletId, 3, state); - - VerifiedPut(env, 1, groupId, blobs[5], state); - - VerifiedBlock(env, 1, groupId, tablet2, 2, state); - - VerifiedPut(env, 1, groupId, blobs[6], state); - VerifiedGet(env, 1, groupId, blobs[6], false, false, 0, state); - - VerifiedPut(env, 1, groupId, blobs[7], state); - VerifiedGet(env, 1, groupId, blobs[7], false, false, 0, state); - } - - void TestBasicCollectGarbage(TEnvironmentSetup& env, ui64 tabletId, ui32 groupId) { - std::vector<TBlobInfo> blobs; - ui64 tablet2 = tabletId + 1; - TBSState state; - state[tabletId]; - state[tablet2]; - - for (ui32 i = 0; i < 10; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, 1, i + 1, 0)); - } - - for (ui32 i = 10; i < 20; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, 1, i + 1, (i % 2))); - } - - for (ui32 i = 0; i < 10; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, 2, i + 1, 0)); - } - - for (ui32 i = 0; i < 10; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, 3 + i, 1, 0)); - } - - for (ui32 i = 0; i < 5; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tablet2, 1, 1, 1 + i, 0)); - } - - for (ui32 i = 0; i < 5; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tablet2, 1, 2 + i, 1, 0)); - } - - // blobs[0]..blobs[39] - tabletId - // blobs[40]..blobs[49] - tablet2 - - for (auto& blob : blobs) { - VerifiedPut(env, 1, groupId, blob, state); - } - - ui32 gen = 2; - ui32 perGenCtr = 1; - - VerifiedCollectGarbage(env, 1, groupId, tabletId, gen, perGenCtr++, 0, true, 1, 2, nullptr, nullptr, false, false, - blobs, state); - VerifiedGet(env, 1, groupId, blobs[0], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[1], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[2], false, false, 0, state); - - VerifiedGet(env, 1, groupId, blobs[20], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[30], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[31], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[40], false, false, 0, state); - - VerifiedCollectGarbage(env, 1, groupId, tabletId, gen, perGenCtr++, 0, true, 1, 1, nullptr, nullptr, false, false, blobs, state); - - { - TBlobInfo blob(DataGen(100), tabletId, 99, 1, 1, 0); - VerifiedPut(env, 1, groupId, blob, state); - blobs.push_back(blob); - } - - VerifiedCollectGarbage(env, 1, groupId, tabletId, gen, perGenCtr++, 0, true, 1, 3, nullptr, nullptr, false, true, - blobs, state); - - { - TBlobInfo blob(DataGen(100), tabletId, 99, 1, 3, 0); - VerifiedPut(env, 1, groupId, blob, state); - blobs.push_back(blob); - } - VerifiedRange(env, 1, groupId, tabletId, blobs[1].Id, blobs[1].Id, false, false, blobs, state); - - VerifiedGet(env, 1, groupId, blobs[1], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[2], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[3], false, false, 0, state); - - VerifiedGet(env, 1, groupId, blobs[20], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[30], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[31], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[40], false, false, 0, state); - - VerifiedCollectGarbage(env, 1, groupId, tabletId, gen, perGenCtr++, 0, true, 1, 1, nullptr, nullptr, false, true, blobs, state); - - VerifiedCollectGarbage(env, 1, groupId, tabletId, gen, perGenCtr++, 0, false, 1, 5, - new TVector<TLogoBlobID>({blobs[4].Id, blobs[5].Id}), - nullptr, - false, false, - blobs, state); - - VerifiedGet(env, 1, groupId, blobs[4], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[5], false, false, 0, state); - - VerifiedCollectGarbage(env, 1, groupId, tabletId, gen, perGenCtr++, 0, false, 1, 6, - nullptr, - new TVector<TLogoBlobID>({blobs[4].Id, blobs[5].Id}), - false, false, - blobs, state); - VerifiedGet(env, 1, groupId, blobs[4], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[5], false, false, 0, state); - - - VerifiedCollectGarbage(env, 1, groupId, tabletId, gen, perGenCtr++, 0, true, 1, 15, nullptr, nullptr, false, true, blobs, state); - - VerifiedRange(env, 1, groupId, tabletId, blobs[10].Id, blobs[19].Id, false, false, blobs, state); - - gen++; - perGenCtr = 1; - VerifiedCollectGarbage(env, 1, groupId, tabletId, gen + 1, perGenCtr++, 0, true, 2, 1, nullptr, nullptr, false, false, blobs, state); - VerifiedGet(env, 1, groupId, blobs[18], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[19], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[20], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[21], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[30], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[31], false, false, 0, state); - VerifiedGet(env, 1, groupId, blobs[40], false, false, 0, state); - - VerifiedCollectGarbage(env, 1, groupId, tabletId, 6, 1, 0, true, 2, 1, nullptr, nullptr, false, false, blobs, state); - - VerifiedRange(env, 1, groupId, tabletId, blobs[0].Id, blobs[39].Id, false, false, blobs, state); - VerifiedRange(env, 1, groupId, tablet2, blobs[40].Id, blobs[49].Id, false, false, blobs, state); - - VerifiedCollectGarbage(env, 1, groupId, tabletId, 7, 2, 0, true, 3, 1, nullptr, nullptr, false, true, blobs, state); - - VerifiedRange(env, 1, groupId, tabletId, blobs[0].Id, blobs[39].Id, false, false, blobs, state); - - VerifiedBlock(env, 1, groupId, tabletId, 10, state); - VerifiedCollectGarbage(env, 1, groupId, tabletId, 7, 1, 0, true, 100, 1, nullptr, nullptr, false, true, blobs, state); - VerifiedGet(env, 1, groupId, blobs[39], false, false, 0, state); - } - - void TestRestoreGet(TEnvironmentSetup& env, ui64 tabletId, ui32 groupId, ui32 blobsNum, std::vector<TActorId>* vdisks) { - std::vector<TBlobInfo> blobs; - TBSState state; - state[tabletId]; - - std::vector<TActorId> allVdisks = *vdisks; - std::mt19937 g; - std::shuffle(allVdisks.begin(), allVdisks.end(), g); - - std::vector<TActorId> brokenVdisks = { allVdisks[0], allVdisks[1], allVdisks[2], allVdisks[3] }; - auto blockedEventType = TEvBlobStorage::TEvVPut::EventType; - env.Runtime->FilterFunction = [&](ui32 /*nodeId*/, std::unique_ptr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() == blockedEventType) { - for (auto vdisk : brokenVdisks) { - if (ev->Recipient == vdisk) { - return false; - } - } - } - return true; - }; - - for (ui32 i = 0; i < blobsNum; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1 + i, 1, 1, 0)); - } - - for (ui32 i = 0; i < blobsNum; ++i) { - VerifiedPut(env, 1, groupId, blobs[i], state); - } - - brokenVdisks = { allVdisks[0], allVdisks[1] }; - blockedEventType = TEvBlobStorage::TEvVGet::EventType; - - for (ui32 i = 0; i < blobsNum; ++i) { - VerifiedGet(env, 1, groupId, blobs[i], true, false, 0, state, false); - } - - blockedEventType = TEvBlobStorage::TEvVGet::EventType; - brokenVdisks = { allVdisks[4], allVdisks[5] }; - - for (ui32 i = 0; i < blobsNum; ++i) { - if (blobs[i].Status == TBlobInfo::EStatus::WRITTEN) { - VerifiedGet(env, 1, groupId, blobs[i], false, false, 0, state, false); - } - } - } - - void TestRestoreDiscover(TEnvironmentSetup& env, ui64 tabletId, ui32 groupId, ui32 blobsNum, std::vector<TActorId>* vdisks) { - std::vector<TBlobInfo> blobs; - TBSState state; - state[tabletId]; - - std::vector<TActorId> allVdisks = *vdisks; - std::mt19937 g; - std::shuffle(allVdisks.begin(), allVdisks.end(), g); - - std::vector<TActorId> brokenVdisks = { allVdisks[0], allVdisks[1], allVdisks[2], allVdisks[3] }; - auto blockedEventType = TEvBlobStorage::TEvVPut::EventType; - env.Runtime->FilterFunction = [&](ui32 /*nodeId*/, std::unique_ptr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() == blockedEventType) { - for (auto vdisk : brokenVdisks) { - if (ev->Recipient == vdisk) { - return false; - } - } - } - return true; - }; - - for (ui32 i = 0; i < blobsNum; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, 1, 1 + i, 0)); - brokenVdisks = { allVdisks[0], allVdisks[1], allVdisks[2], allVdisks[3] }; - blockedEventType = TEvBlobStorage::TEvVPut::EventType; - VerifiedPut(env, 1, groupId, blobs[i], state); - brokenVdisks = { allVdisks[0], allVdisks[1] }; - blockedEventType = TEvBlobStorage::TEvVGet::EventType; - VerifiedDiscover(env, 1, groupId, tabletId, 0, true, false, 0, false, blobs, state, false); - } - - for (ui32 i = blobsNum; i < 2 * blobsNum; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tabletId, 1, 1, 1 + i, 0)); - brokenVdisks = { allVdisks[0], allVdisks[1], allVdisks[2], allVdisks[3] }; - blockedEventType = TEvBlobStorage::TEvVPut::EventType; - VerifiedPut(env, 1, groupId, blobs[i], state); - brokenVdisks = { allVdisks[0], allVdisks[1] }; - blockedEventType = TEvBlobStorage::TEvVGet::EventType; - VerifiedDiscover(env, 1, groupId, tabletId, 0, false, false, 0, false, blobs, state, false); - } - - blockedEventType = TEvBlobStorage::TEvVGet::EventType; - brokenVdisks = { allVdisks[4], allVdisks[5] }; - - for (ui32 i = 0; i < blobsNum * 2; ++i) { - if (blobs[i].Status == TBlobInfo::EStatus::WRITTEN) { - VerifiedGet(env, 1, groupId, blobs[i], false, false, 0, state, false); - } - } - } - - void TestRestoreRange(TEnvironmentSetup& env, ui64 tabletId, ui32 groupId, ui32 blobsNum, std::vector<TActorId>* vdisks) { - std::vector<TBlobInfo> blobs; - TBSState state; - state[tabletId]; - - std::vector<TActorId> allVdisks = *vdisks; - std::mt19937 g; - std::shuffle(allVdisks.begin(), allVdisks.end(), g); - - std::vector<TActorId> brokenVdisks = { allVdisks[0], allVdisks[1], allVdisks[2], allVdisks[3] }; - auto blockedEventType = TEvBlobStorage::TEvVPut::EventType; - env.Runtime->FilterFunction = [&](ui32 /*nodeId*/, std::unique_ptr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() == blockedEventType) { - for (auto vdisk : brokenVdisks) { - if (ev->Recipient == vdisk) { - return false; - } - } - } - return true; - }; - - for (ui32 i = 0; i < blobsNum; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tabletId, Rand(NKikimr::TLogoBlobID::MaxCookie), 1, 1 + i, 0)); - VerifiedPut(env, 1, groupId, blobs[i], state); - } - - for (ui32 i = blobsNum; i < 2 * blobsNum; ++i) { - blobs.push_back(TBlobInfo(DataGen(100), tabletId, Rand(NKikimr::TLogoBlobID::MaxCookie), 1, 1 + i, 0)); - VerifiedPut(env, 1, groupId, blobs[i], state); - } - - blockedEventType = TEvBlobStorage::TEvVGet::EventType; - brokenVdisks = { allVdisks[0], allVdisks[1] }; - - VerifiedRange(env, 1, groupId, tabletId, blobs[0].Id, blobs[blobsNum - 1].Id, true, false, blobs, state, false); - VerifiedRange(env, 1, groupId, tabletId, blobs[blobsNum].Id, blobs[2 * blobsNum - 1].Id, true, true, blobs, state, false); - - blockedEventType = TEvBlobStorage::TEvVGet::EventType; - brokenVdisks = { allVdisks[4], allVdisks[5] }; - - for (ui32 i = 0; i < 2 * blobsNum; ++i) { - if (blobs[i].Status == TBlobInfo::EStatus::WRITTEN) { - VerifiedGet(env, 1, groupId, blobs[i], false, false, 0, state, false); - } - } - } - - void TestVerifiedRandom(TEnvironmentSetup& env, ui32 nodeCount, ui64 tabletId0, ui32 groupId, ui32 iterationsNum, ui32 decommitStep = 1e9) { - enum EActions { - ALTER = 0, - PUT, - GET, - MULTIGET, - RANGE, - BLOCK, - DISCOVER, - COLLECT_GARBAGE_HARD, - COLLECT_GARBAGE_SOFT, - RESTART_BLOB_DEPOT, - }; - std::vector<ui32> probs = { 10, 10, 3, 3, 2, 1, 1, 3, 3, 1 }; - TIntervals act(probs); - - std::vector<ui64> tablets = {tabletId0, tabletId0 + 1, tabletId0 + 2}; - std::vector<ui32> tabletGen = {1, 1, 1}; - std::vector<ui32> tabletStep = {1, 1, 1}; - std::vector<ui32> channels = {0, 1, 2}; - - std::vector<TBlobInfo> blobs; - - blobs.push_back(TBlobInfo("junk", 999, 999, 1, 1, 0)); - - TBSState state; - for (ui32 i = 0; i < tablets.size(); ++i) { - state[tablets[i]]; - } - - ui32 perGenCtr = 0; - - for (ui32 iteration = 0; iteration < iterationsNum; ++iteration) { - if (iteration == decommitStep) { - DecommitGroup(env, groupId); - continue; - } - ui32 tablet = Rand(tablets.size()); - ui32 tabletId = tablets[tablet]; - ui32 channel = Rand(channels); - ui32& gen = tabletGen[tablet]; - ui32& step = tabletStep[tablet]; - ui32 node = Rand(1, nodeCount); - - ui32 softCollectGen = state[tabletId].Channels[channel].SoftCollectGen; - ui32 softCollectStep = state[tabletId].Channels[channel].SoftCollectStep; - ui32 hardCollectGen = state[tabletId].Channels[channel].HardCollectGen; - ui32 hardCollectStep = state[tabletId].Channels[channel].HardCollectStep; - - ui32 action = act.GetInterval(Rand(act.UpperLimit())); - // Cerr << action << Endl; - switch (action) { - case EActions::ALTER: - { - if (Rand(3) == 0) { - gen += Rand(1, 2); - perGenCtr = 0; - } else { - step += Rand(1, 2); - } - } - break; - - case EActions::PUT: - { - ui32 cookie = Rand(NKikimr::TLogoBlobID::MaxCookie); - TBlobInfo blob(DataGen(Rand(50, 1000)), tabletId, cookie, gen, step, channel); - VerifiedPut(env, node, groupId, blob, state); - blobs.push_back(blob); - } - break; - - case EActions::GET: - { - TBlobInfo& blob = Rand(blobs); - bool mustRestoreFirst = Rand(2); - bool indexOnly = Rand(2); - ui32 forceBlockedGeneration = 0; - VerifiedGet(env, node, groupId, blob, mustRestoreFirst, indexOnly, forceBlockedGeneration, state); - } - break; - - case EActions::MULTIGET: - { - std::vector<TBlobInfo> getBlobs; - ui32 requestSize = Rand(50, 100); - for (ui32 i = 0; i < blobs.size() && i < requestSize; ++i) { - TBlobInfo& blob = Rand(blobs); - if (blob.Id.TabletID() == tabletId) { - getBlobs.push_back(blob); - } - } - - if (getBlobs.empty()) { - getBlobs.push_back(blobs[0]); - } - - bool mustRestoreFirst = Rand(2); - bool indexOnly = Rand(2); - ui32 forceBlockedGeneration = 0; - VerifiedGet(env, node, groupId, getBlobs, mustRestoreFirst, indexOnly, forceBlockedGeneration, state); - } - break; - - case EActions::RANGE: - { - TLogoBlobID r1 = Rand(blobs).Id; - TLogoBlobID r2 = Rand(blobs).Id; - - TLogoBlobID from(tabletId, r1.Generation(), r1.Step(), r1.Channel(), r1.BlobSize(), r1.Cookie()); - TLogoBlobID to(tabletId, r2.Generation(), r2.Step(), r2.Channel(), r2.BlobSize(), r2.Cookie()); - - if (from > to) { - std::swap(from, to); - } - - bool mustRestoreFirst = Rand(2); - bool indexOnly = Rand(2); - VerifiedRange(env, node, groupId, tabletId, from, to, mustRestoreFirst, indexOnly, blobs, state); - } - break; - - case EActions::BLOCK: - { - ui32 prevBlockedGen = state[tabletId].BlockedGen; - ui32 tryBlock = prevBlockedGen + Rand(4); - if (tryBlock > 0) { - tryBlock -= 1; - } - - VerifiedBlock(env, node, groupId, tabletId, tryBlock, state); - } - break; - - - case EActions::DISCOVER: - { - ui32 minGeneration = Rand(0, gen + 2); - bool readBody = Rand(2); - bool discoverBlockedGeneration = Rand(2); - ui32 forceBlockedGeneration = 0; - bool fromLeader = Rand(2); - - VerifiedDiscover(env, node, groupId, tabletId, minGeneration, readBody, discoverBlockedGeneration, forceBlockedGeneration, - fromLeader, blobs, state); - } - break; - - case EActions::COLLECT_GARBAGE_HARD: - { - ui32 tryGen = hardCollectGen + Rand(2); - ui32 tryStep = 0; - if (tryGen > 0 && !Rand(3)) { tryGen -= 1; } - if (tryGen > hardCollectGen) { - tryStep = Rand(hardCollectStep / 2); - } else { - tryStep = hardCollectStep + Rand(2); - if (tryStep > 0 && !Rand(3)) { tryStep -= 1; } - } - - bool collect = Rand(2); - bool isMultiCollectAllowed = Rand(2); - - THolder<TVector<TLogoBlobID>> keep(new TVector<TLogoBlobID>()); - THolder<TVector<TLogoBlobID>> doNotKeep(new TVector<TLogoBlobID>()); - - for (auto& blob : blobs) { - if (blob.Status == TBlobInfo::EStatus::WRITTEN) { - if (!Rand(5)) { - keep->push_back(blob.Id); - } else if (Rand(2)) { - doNotKeep->push_back(blob.Id); - } - } - } - - if (keep->size() == 0 && doNotKeep->size() == 0) { - collect = true; - } - - VerifiedCollectGarbage(env, node, groupId, tabletId, gen, perGenCtr++, channel, collect, - tryGen, tryStep, keep.Release(), doNotKeep.Release(), isMultiCollectAllowed, true, blobs, state); - } - break; - - case EActions::COLLECT_GARBAGE_SOFT: - { - ui32 tryGen = softCollectGen + Rand(2); - ui32 tryStep = 0; - if (tryGen > 0 && !Rand(3)) { tryGen -= 1; } - if (tryGen > softCollectGen) { - tryStep = Rand(softCollectStep / 2); - } else { - tryStep = softCollectStep + Rand(2); - if (tryStep > 0 && !Rand(3)) { tryStep -= 1; } - } - - bool collect = Rand(2); - bool isMultiCollectAllowed = Rand(2); - - THolder<TVector<TLogoBlobID>> keep(new TVector<TLogoBlobID>()); - THolder<TVector<TLogoBlobID>> doNotKeep(new TVector<TLogoBlobID>()); - - for (auto& blob : blobs) { - if (blob.Status == TBlobInfo::EStatus::WRITTEN) { - if (!Rand(5)) { - keep->push_back(blob.Id); - } else if (Rand(2)) { - doNotKeep->push_back(blob.Id); - } - } - } - - if (keep->size() == 0 && doNotKeep->size() == 0) { - collect = true; - } - - VerifiedCollectGarbage(env, node, groupId, tabletId, gen, perGenCtr++, channel, collect, - tryGen, tryStep, keep.Release(), doNotKeep.Release(), isMultiCollectAllowed, false, blobs, state); - } - break; - case EActions::RESTART_BLOB_DEPOT: - { - auto edge = env.Runtime->AllocateEdgeActor(node); - env.Runtime->WrapInActorContext(edge, [&] { - TActivationContext::Register(CreateTabletKiller(BLOB_DEPOT_TABLET_ID)); - }); - env.Runtime->DestroyActor(edge); - } - break; - - default: - UNIT_FAIL("TIntervals failed"); - } - } - } - - void TestLoadPutAndGet(TEnvironmentSetup& env, ui64 tabletId, ui32 groupId, ui32 blobsNum, ui32 maxBlobSize, ui32 readsNum, bool decommit = false) { - enum EActions { - GET, - MULTIGET, - RANGE, - DISCOVER, - CATCH_ALL, - RESTART_BLOB_DEPOT, - }; - std::vector<ui32> probs = { 5, 1, 5, 5, 1, 1 }; - TIntervals act(probs); - - std::vector<TBlobInfo> blobs; - std::map<TLogoBlobID, TBlobInfo*> mappedBlobs; - TBSState state; - state[tabletId]; - - TActorId edge = env.Runtime->AllocateEdgeActor(1); - - blobs.reserve(blobsNum); - - std::map<ui64, std::shared_ptr<TEvArgs>> requests; - - ui32 getCtr = 0; - ui32 rangeCtr = 0; - ui32 discoverCtr = 0; - - for (ui32 i = 0; i < blobsNum; ++i) { - blobs.push_back(TBlobInfo(DataGen(Rand(1, maxBlobSize)), tabletId, Rand(NKikimr::TLogoBlobID::MaxCookie), 1, 1, 0)); - mappedBlobs[blobs[i].Id] = &blobs[i]; - } - - for (ui32 i = 0; i < blobsNum; ++i) { - SendTEvPut(env, edge, groupId, blobs[i].Id, blobs[i].Data); - } - - for (ui32 i = 0; i < blobsNum; ++i) { - auto res = CaptureTEvPutResult(env, edge, false); - UNIT_ASSERT_C(res->Get(), TStringBuilder() << "Fail on iteration# " << i); - auto it = mappedBlobs.find(res->Get()->Id); - if (it == mappedBlobs.end()) { - UNIT_FAIL("Put nonexistent blob"); - } - VerifyTEvPutResult(res.Release(), *it->second, state); - } - - if (decommit) { - DecommitGroup(env, groupId); - } - - for (ui32 iteration = 0; iteration < readsNum; ++iteration) { - ui32 action = act.GetInterval(Rand(act.UpperLimit())); - if (iteration == readsNum - 1) { - action = 4; - } - ui64 cookie = Rand64(); - // Cerr << action << Endl; - switch (action) { - case EActions::GET: - { - ui32 blobNum = Rand(1 , blobsNum); - bool mustRestoreFirst = Rand(2); - bool indexOnly = Rand(2); - ui32 forceBlockedGeneration = 0; - SendTEvGet(env, edge, groupId, blobs[blobNum].Id, mustRestoreFirst, indexOnly, forceBlockedGeneration, cookie); - getCtr++; - requests[cookie] = std::make_shared<TEvGetArgs>(mustRestoreFirst, indexOnly, forceBlockedGeneration); - } - break; - - case EActions::MULTIGET: - { - ui32 blobsInRequest = Rand(1, 33); - std::vector<TBlobInfo> request; - for (ui32 i = 0; i < blobsInRequest; ++i) { - request.push_back(Rand(blobs)); - } - - bool mustRestoreFirst = Rand(2); - bool indexOnly = Rand(2); - ui32 forceBlockedGeneration = 0; - SendTEvGet(env, edge, groupId, request, mustRestoreFirst, indexOnly, forceBlockedGeneration, cookie); - getCtr++; - requests[cookie] = std::make_shared<TEvGetArgs>(mustRestoreFirst, indexOnly, forceBlockedGeneration); - } - break; - - case EActions::RANGE: - { - TLogoBlobID from = Rand(blobs).Id; - TLogoBlobID to = Rand(blobs).Id; - if (from > to) { - std::swap(from, to); - } - - bool mustRestoreFirst = false; - bool indexOnly = Rand(2); - SendTEvRange(env, edge, groupId, tabletId, from, to, mustRestoreFirst, indexOnly, cookie); - rangeCtr++; - requests[cookie] = std::make_shared<TEvRangeArgs>(mustRestoreFirst, indexOnly); - } - break; - - case EActions::DISCOVER: - { - ui32 minGeneration = 0; - bool readBody = Rand(2); - bool discoverBlockedGeneration = Rand(2); - ui32 forceBlockedGeneration = 0; - bool fromLeader = Rand(2); - - SendTEvDiscover(env, edge, groupId, tabletId, minGeneration, readBody, discoverBlockedGeneration, forceBlockedGeneration, - fromLeader, cookie); - discoverCtr++; - requests[cookie] = std::make_shared<TEvDiscoverArgs>(minGeneration, readBody, discoverBlockedGeneration, forceBlockedGeneration, fromLeader); - } - break; - - case EActions::CATCH_ALL: - { - // Cerr << getCtr << ' ' << rangeCtr << ' ' << discoverCtr << Endl; - while (getCtr + rangeCtr + discoverCtr) { - auto ev = CaptureAnyResult(env, edge); - UNIT_ASSERT_C(ev, TStringBuilder() << "Event lost, expected " << getCtr << " TEvGetResult's, " << rangeCtr << " TEvRangeResult's, " << discoverCtr << " TEvDiscoverResult's"); - switch (ev->GetTypeRewrite()) { - case TEvBlobStorage::TEvGetResult::EventType: - { - std::unique_ptr<TEventHandle<TEvBlobStorage::TEvGetResult>> res(reinterpret_cast<TEventHandle<TEvBlobStorage::TEvGetResult>*>(ev.release())); - UNIT_ASSERT(res); - std::vector<TBlobInfo> response; - ui32 responseSz = res->Get()->ResponseSz; - for (ui32 i = 0; i < responseSz; ++i) { - response.push_back(*mappedBlobs[res->Get()->Responses[i].Id]); - } - TEvGetArgs args = *requests[res->Cookie]->Get<TEvGetArgs>(); - VerifyTEvGetResult(res.release(), response, args.MustRestoreFirst, args.IndexOnly, args.ForceBlockedGeneration, state); - } - --getCtr; - break; - case TEvBlobStorage::TEvRangeResult::EventType: - { - std::unique_ptr<TEventHandle<TEvBlobStorage::TEvRangeResult>> res(reinterpret_cast<TEventHandle<TEvBlobStorage::TEvRangeResult>*>(ev.release())); - UNIT_ASSERT(res); - TLogoBlobID from = res->Get()->From; - TLogoBlobID to = res->Get()->To; - TEvRangeArgs args = *requests[res->Cookie]->Get<TEvRangeArgs>(); - VerifyTEvRangeResult(res.release(), tabletId, from, to, args.MustRestoreFirst, args.IndexOnly, blobs, state); - } - --rangeCtr; - break; - case TEvBlobStorage::TEvDiscoverResult::EventType: - { - std::unique_ptr<TEventHandle<TEvBlobStorage::TEvDiscoverResult>> res(reinterpret_cast<TEventHandle<TEvBlobStorage::TEvDiscoverResult>*>(ev.release())); - UNIT_ASSERT(res); - UNIT_ASSERT(res->Get()); - TEvDiscoverArgs args = *requests[res->Cookie]->Get<TEvDiscoverArgs>(); - VerifyTEvDiscoverResult(res.release(), tabletId, args.MinGeneration, args.ReadBody, args.DiscoverBlockedGeneration, - args.ForceBlockedGeneration, args.FromLeader, blobs, state); - } - --discoverCtr; - break; - } - } - } - break; - - case EActions::RESTART_BLOB_DEPOT: - // Cerr << "RESTART" << Endl; - { - env.Runtime->WrapInActorContext(edge, [&] { - TActivationContext::Register(CreateTabletKiller(BLOB_DEPOT_TABLET_ID)); - }); - } - break; - - default: - UNIT_FAIL("TIntervals failed"); - } - } - } - Y_UNIT_TEST(BasicPutAndGet) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); - TestBasicPutAndGet(*envPtr, 1, regularGroups[0]); - TestBasicPutAndGet(*envPtr, 11, blobDepot); + TestBasicPutAndGet(tenv, 1, tenv.RegularGroups[0]); + TestBasicPutAndGet(tenv, 11, tenv.BlobDepot); } Y_UNIT_TEST(BasicRange) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); - TestBasicRange(*envPtr, 1, regularGroups[0]); - TestBasicRange(*envPtr, 100, blobDepot); + TestBasicRange(tenv, 1, tenv.RegularGroups[0]); + TestBasicRange(tenv, 100, tenv.BlobDepot); } Y_UNIT_TEST(BasicDiscover) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); - TestBasicDiscover(*envPtr, 1000, regularGroups[0]); - TestBasicDiscover(*envPtr, 100, blobDepot); + TestBasicDiscover(tenv, 1000, tenv.RegularGroups[0]); + TestBasicDiscover(tenv, 100, tenv.BlobDepot); } Y_UNIT_TEST(BasicBlock) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); - TestBasicBlock(*envPtr, 15, regularGroups[0]); - TestBasicBlock(*envPtr, 100, blobDepot); + TestBasicBlock(tenv, 15, tenv.RegularGroups[0]); + TestBasicBlock(tenv, 100, tenv.BlobDepot); } Y_UNIT_TEST(BasicCollectGarbage) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); - TestBasicCollectGarbage(*envPtr, 15, regularGroups[0]); - TestBasicCollectGarbage(*envPtr, 100, blobDepot); + TestBasicCollectGarbage(tenv, 15, tenv.RegularGroups[0]); + TestBasicCollectGarbage(tenv, 100, tenv.BlobDepot); } Y_UNIT_TEST(VerifiedRandom) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); - // TestVerifiedRandom(*envPtr, 8, 15, regularGroups[0], 1000); - TestVerifiedRandom(*envPtr, 8, 100, blobDepot, 1000); + // TestVerifiedRandom(tenv, 8, 15, tenv.RegularGroups[0], 1000); + TestVerifiedRandom(tenv, 8, 100, tenv.BlobDepot, 1000); } Y_UNIT_TEST(LoadPutAndRead) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); - // TestLoadPutAndGet(*envPtr, 100, blobDepot, 1 << 10, 1 << 15, 500); - TestLoadPutAndGet(*envPtr, 100, blobDepot, 1 << 10, 1 << 15, 500); + // TestLoadPutAndGet(tenv, 100, tenv.BlobDepot, 1 << 10, 1 << 15, 500); + TestLoadPutAndGet(tenv, 100, tenv.BlobDepot, 100, 1 << 10, 500); } Y_UNIT_TEST(DecommitPutAndRead) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); - TestLoadPutAndGet(*envPtr, 15, regularGroups[0], 1 << 10, 1 << 15, 500, true); + TestLoadPutAndGet(tenv, 15, tenv.RegularGroups[0], 100, 1 << 10, 500, true, { 5, 1, 5, 1, 1, 0 }); } Y_UNIT_TEST(DecommitVerifiedRandom) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); - - TestVerifiedRandom(*envPtr, 8, 15, regularGroups[0], 1000, 499); - } - - Y_UNIT_TEST(RestoreGet) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); - TEnvironmentSetup& env = *envPtr; - - auto vdisksRegular = env.GetGroupInfo(regularGroups[0])->GetDynamicInfo().ServiceIdForOrderNumber; - - TestRestoreGet(*envPtr, 15, regularGroups[0], 10, &vdisksRegular); - // TestRestoreGet(*envPtr, 100, blobDepot, 10, &vdisksBlobDepot); - } - - Y_UNIT_TEST(RestoreDiscover) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); - TEnvironmentSetup& env = *envPtr; - - auto vdisksRegular = env.GetGroupInfo(regularGroups[0])->GetDynamicInfo().ServiceIdForOrderNumber; - - TestRestoreDiscover(*envPtr, 15, regularGroups[0], 10, &vdisksRegular); - // TestRestoreDiscover(*envPtr, 100, blobDepot, 10, &vdisksBlobDepot); - } - - Y_UNIT_TEST(RestoreRange) { - std::unique_ptr<TEnvironmentSetup> envPtr; - std::vector<ui32> regularGroups; - ui32 blobDepot; - ConfigureEnvironment(1, envPtr, regularGroups, blobDepot, 8, TBlobStorageGroupType::Erasure4Plus2Block); - TEnvironmentSetup& env = *envPtr; - - auto vdisksRegular = env.GetGroupInfo(regularGroups[0])->GetDynamicInfo().ServiceIdForOrderNumber; + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); - TestRestoreRange(*envPtr, 15, regularGroups[0], 5, &vdisksRegular); - // TestRestoreRange(*envPtr, 100, blobDepot, 10, &vdisksBlobDepot); + TestVerifiedRandom(tenv, 8, 15, tenv.RegularGroups[0], 1000, 499, { 10, 10, 3, 3, 2, 1, 1, 3, 3, 0 }); } } diff --git a/ydb/core/blobstorage/ut_blobstorage/blob_depot_fat.cpp b/ydb/core/blobstorage/ut_blobstorage/blob_depot_fat.cpp new file mode 100644 index 00000000000..eac9d1253e0 --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/blob_depot_fat.cpp @@ -0,0 +1,60 @@ +#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> +#include <ydb/core/blob_depot/events.h> + +#include "blob_depot_event_managers.h" +#include "blob_depot_auxiliary_structures.h" +#include "blob_depot_test_functions.h" + +#include <util/random/entropy.h> + +using namespace NKikimr::NBlobDepot; + +Y_UNIT_TEST_SUITE(BlobDepotFat) { + Y_UNIT_TEST(FatVerifiedRandom) { + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed, 1, 8, TBlobStorageGroupType::ErasureMirror3of4, 1500); + + TestVerifiedRandom(tenv, 8, 100, tenv.BlobDepot, 1e9); + } + + Y_UNIT_TEST(FatDecommitVerifiedRandom) { + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed, 1, 8, TBlobStorageGroupType::ErasureMirror3of4, 1500); + + TestVerifiedRandom(tenv, 8, 100, tenv.BlobDepot, 1e9, 1000); + } + +/* ----- Restore is not implemented in blob depot ------ + Y_UNIT_TEST(RestoreGet) { + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); + auto vdisksRegular = tenv.Env->GetGroupInfo(tenv.RegularGroups[0])->GetDynamicInfo().ServiceIdForOrderNumber; + + TestRestoreGet(tenv, 15, tenv.RegularGroups[0], 10, &vdisksRegular); + TestRestoreGet(tenv, 100, tenv.BlobDepot, 10, &vdisksBlobDepot); + } + + Y_UNIT_TEST(RestoreDiscover) { + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); + auto vdisksRegular = tenv.Env->GetGroupInfo(tenv.RegularGroups[0])->GetDynamicInfo().ServiceIdForOrderNumber; + + TestRestoreDiscover(tenv, 15, tenv.RegularGroups[0], 10, &vdisksRegular); + TestRestoreDiscover(tenv, 100, tenv.BlobDepot, 10, &vdisksBlobDepot); + } + + Y_UNIT_TEST(RestoreRange) { + ui32 seed; + Seed().LoadOrFail(&seed, sizeof(seed)); + TBlobDepotTestEnvironment tenv(seed); + auto vdisksRegular = tenv.Env->GetGroupInfo(tenv.RegularGroups[0])->GetDynamicInfo().ServiceIdForOrderNumber; + + TestRestoreRange(tenv, 15, tenv.RegularGroups[0], 10, &vdisksRegular); + TestRestoreRange(tenv, 100, tenv.BlobDepot, 10, &vdisksBlobDepot); + } +*/ +} diff --git a/ydb/core/blobstorage/ut_blobstorage/blob_depot_test_functions.cpp b/ydb/core/blobstorage/ut_blobstorage/blob_depot_test_functions.cpp new file mode 100644 index 00000000000..3bb4e83a795 --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/blob_depot_test_functions.cpp @@ -0,0 +1,920 @@ +#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> +#include <ydb/core/blob_depot/events.h> + +#include <util/random/mersenne.h> +#include <util/random/random.h> + +#include <algorithm> +#include <random> + +#include "blob_depot_event_managers.h" +#include "blob_depot_test_functions.h" +#include "blob_depot_auxiliary_structures.h" + +void ConfigureEnvironment(ui32 numGroups, std::unique_ptr<TEnvironmentSetup>& envPtr, std::vector<ui32>& regularGroups, ui32& blobDepot, ui32 nodeCount, TBlobStorageGroupType erasure) { + envPtr = std::make_unique<TEnvironmentSetup>(TEnvironmentSetup::TSettings{ + .NodeCount = nodeCount, + .Erasure = erasure, + .SetupHive = true, + }); + + envPtr->CreateBoxAndPool(1, numGroups); + envPtr->Sim(TDuration::Seconds(20)); + + regularGroups = envPtr->GetGroups(); + + NKikimrBlobStorage::TConfigRequest request; + TString virtualPool = "virtual_pool"; + { + auto *cmd = request.AddCommand()->MutableDefineStoragePool(); + cmd->SetBoxId(1); + cmd->SetName(virtualPool); + cmd->SetErasureSpecies("none"); + cmd->SetVDiskKind("Default"); + } + { + auto *cmd = request.AddCommand()->MutableAllocateVirtualGroup(); + cmd->SetName("vg"); + cmd->SetHiveId(envPtr->Runtime->GetAppData()->DomainsInfo->HivesByHiveUid.begin()->second); + cmd->SetStoragePoolName(virtualPool); + auto *prof = cmd->AddChannelProfiles(); + prof->SetStoragePoolName(envPtr->StoragePoolName); + prof->SetCount(2); + prof = cmd->AddChannelProfiles(); + prof->SetStoragePoolName(envPtr->StoragePoolName); + prof->SetChannelKind(NKikimrBlobDepot::TChannelKind::Data); + prof->SetCount(2); + } + + auto response = envPtr->Invoke(request); + UNIT_ASSERT_C(response.GetSuccess(), response.GetErrorDescription()); + blobDepot = response.GetStatus(1).GetGroupId(0); + + envPtr->Sim(TDuration::Seconds(5)); // some time for blob depot to crank up +} + +void DecommitGroup(TBlobDepotTestEnvironment& tenv, ui32 groupId) { + TString blobDepotPool = "decommit_blob_depot_pool"; + ui32 blobDepotPoolId = 42; + tenv.Env->CreatePoolInBox(1, blobDepotPoolId, blobDepotPool); + NKikimrBlobStorage::TConfigRequest request; + + auto *cmd = request.AddCommand()->MutableDecommitGroups(); + cmd->AddGroupIds(groupId); + cmd->SetHiveId(tenv.Env->Runtime->GetAppData()->DomainsInfo->HivesByHiveUid.begin()->second); + auto *prof = cmd->AddChannelProfiles(); + prof->SetStoragePoolName(blobDepotPool); + prof->SetCount(2); + prof = cmd->AddChannelProfiles(); + prof->SetStoragePoolName(blobDepotPool); + prof->SetChannelKind(NKikimrBlobDepot::TChannelKind::Data); + prof->SetCount(2); + + auto response = tenv.Env->Invoke(request); + UNIT_ASSERT_C(response.GetSuccess(), response.GetErrorDescription()); +} + +void TestBasicPutAndGet(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId) { + std::vector<TBlobInfo> blobs; + TBSState state; + state[tabletId]; + + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1)); + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 2)); + blobs.push_back(TBlobInfo(tenv.DataGen(200), tabletId, 1)); + + VerifiedGet(*tenv.Env, 1, groupId, blobs[0], true, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[1], true, false, 0, state); + + VerifiedPut(*tenv.Env, 1, groupId, blobs[0], state); + VerifiedPut(*tenv.Env, 1, groupId, blobs[1], state); + + VerifiedGet(*tenv.Env, 1, groupId, blobs[0], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[1], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[2], false, false, 0, state); + + VerifiedGet(*tenv.Env, 1, groupId, blobs, false, false, 0, state); + + blobs.push_back(TBlobInfo(tenv.DataGen(1000), tabletId + (1 << 12), 1)); + VerifiedPut(*tenv.Env, 1, groupId, blobs[2], state); + VerifiedPut(*tenv.Env, 1, groupId, blobs[3], state); + + VerifiedGet(*tenv.Env, 1, groupId, blobs[2], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[3], false, false, 0, state); +} + +TLogoBlobID MinBlobID(ui64 tablet) { + return TLogoBlobID(tablet, 0, 0, 0, 0, 0); +} + +TLogoBlobID MaxBlobID(ui64 tablet) { + return TLogoBlobID(tablet, Max<ui32>(), Max<ui32>(), NKikimr::TLogoBlobID::MaxChannel, + NKikimr::TLogoBlobID::MaxBlobSize, NKikimr::TLogoBlobID::MaxCookie, NKikimr::TLogoBlobID::MaxPartId, + NKikimr::TLogoBlobID::MaxCrcMode); +} + +void TestBasicRange(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId) { + std::vector<TBlobInfo> blobs; + TBSState state; + state[tabletId]; + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1)); + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 2)); + blobs.push_back(TBlobInfo(tenv.DataGen(200), tabletId, 1)); + + VerifiedPut(*tenv.Env, 1, groupId, blobs[0], state); + VerifiedPut(*tenv.Env, 1, groupId, blobs[1], state); + + VerifiedRange(*tenv.Env, 1, groupId, tabletId, MinBlobID(tabletId), MaxBlobID(tabletId), false, false, blobs, state); + VerifiedRange(*tenv.Env, 1, groupId, tabletId, MinBlobID(tabletId), MaxBlobID(tabletId), false, true, blobs, state); + + ui32 n = 100; + for (ui32 i = 0; i < n; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1000 + i)); + if (i % 2) { + VerifiedPut(*tenv.Env, 1, groupId, blobs[i], state); + } + } + + VerifiedRange(*tenv.Env, 1, groupId, tabletId, blobs[0].Id, blobs[n/2 - 1].Id, false, false, blobs, state); + VerifiedRange(*tenv.Env, 1, groupId, tabletId, blobs[0].Id, blobs[n/2 - 1].Id, false, true, blobs, state); +} + +void TestBasicDiscover(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId) { + + std::vector<TBlobInfo> blobs; + ui64 tablet2 = tabletId + 1000; + TBSState state; + state[tabletId]; + state[tablet2]; + + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, 2)); + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, 3)); + blobs.push_back(TBlobInfo(tenv.DataGen(200), tabletId, 1, 4)); + blobs.push_back(TBlobInfo(tenv.DataGen(200), tabletId, 2, 4, 1)); + + VerifiedDiscover(*tenv.Env, 1, groupId, tabletId, 0, false, false, 0, true, blobs, state); + VerifiedDiscover(*tenv.Env, 1, groupId, tabletId, 1, false, false, 0, true, blobs, state); + VerifiedDiscover(*tenv.Env, 1, groupId, tabletId, 0, true, false, 0, true, blobs, state); + + VerifiedPut(*tenv.Env, 1, groupId, blobs[0], state); + VerifiedPut(*tenv.Env, 1, groupId, blobs[1], state); + + VerifiedDiscover(*tenv.Env, 1, groupId, tabletId, 0, false, false, 0, true, blobs, state); + VerifiedDiscover(*tenv.Env, 1, groupId, tabletId, 0, true, false, 0, true, blobs, state); + VerifiedDiscover(*tenv.Env, 1, groupId, tabletId, 0, true, false, 0, true, blobs, state); + + VerifiedDiscover(*tenv.Env, 1, groupId, tabletId, 100, true, false, 0, true, blobs, state); + + blobs.push_back(TBlobInfo(tenv.DataGen(1000), tablet2, 10, 2)); + VerifiedDiscover(*tenv.Env, 1, groupId, tablet2, 0, true, false, 0, true, blobs, state); + + VerifiedPut(*tenv.Env, 1, groupId, blobs[3], state); + VerifiedDiscover(*tenv.Env, 1, groupId, tablet2, 0, false, false, 0, true, blobs, state); + + VerifiedDiscover(*tenv.Env, 1, groupId, tablet2, 42, true, false, 0, true, blobs, state); +} + +void TestBasicBlock(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId) { + ui32 tablet2 = tabletId + 1; + std::vector<TBlobInfo> blobs; + TBSState state; + state[tabletId]; + state[tablet2]; + + ui32 lastGen = 0; + + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, lastGen++)); + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, lastGen++)); + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, lastGen++)); + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, lastGen++)); + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, lastGen++)); // blobs[4] + + ui32 lastGen2 = 1; + blobs.push_back(TBlobInfo(tenv.DataGen(100), tablet2, 1, lastGen2++, 1)); + blobs.push_back(TBlobInfo(tenv.DataGen(100), tablet2, 2, lastGen2++, 2)); + blobs.push_back(TBlobInfo(tenv.DataGen(100), tablet2, 3, lastGen2++, 3)); + + VerifiedPut(*tenv.Env, 1, groupId, blobs[2], state); + + VerifiedBlock(*tenv.Env, 1, groupId, tabletId, 3, state); + + VerifiedPut(*tenv.Env, 1, groupId, blobs[1], state); + VerifiedPut(*tenv.Env, 1, groupId, blobs[3], state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[3], false, false, 0, state); + + VerifiedPut(*tenv.Env, 1, groupId, blobs[4], state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[4], false, false, 0, state); + + VerifiedBlock(*tenv.Env, 1, groupId, tabletId, 2, state); + VerifiedBlock(*tenv.Env, 1, groupId, tabletId, 3, state); + + VerifiedPut(*tenv.Env, 1, groupId, blobs[5], state); + + VerifiedBlock(*tenv.Env, 1, groupId, tablet2, 2, state); + + VerifiedPut(*tenv.Env, 1, groupId, blobs[6], state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[6], false, false, 0, state); + + VerifiedPut(*tenv.Env, 1, groupId, blobs[7], state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[7], false, false, 0, state); +} + +void TestBasicCollectGarbage(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId) { + std::vector<TBlobInfo> blobs; + ui64 tablet2 = tabletId + 1; + TBSState state; + state[tabletId]; + state[tablet2]; + + for (ui32 i = 0; i < 10; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, 1, i + 1, 0)); + } + + for (ui32 i = 10; i < 20; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, 1, i + 1, (i % 2))); + } + + for (ui32 i = 0; i < 10; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, 2, i + 1, 0)); + } + + for (ui32 i = 0; i < 10; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, 3 + i, 1, 0)); + } + + for (ui32 i = 0; i < 5; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tablet2, 1, 1, 1 + i, 0)); + } + + for (ui32 i = 0; i < 5; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tablet2, 1, 2 + i, 1, 0)); + } + + // blobs[0]..blobs[39] - tabletId + // blobs[40]..blobs[49] - tablet2 + + for (auto& blob : blobs) { + VerifiedPut(*tenv.Env, 1, groupId, blob, state); + } + + ui32 gen = 2; + ui32 perGenCtr = 1; + + VerifiedCollectGarbage(*tenv.Env, 1, groupId, tabletId, gen, perGenCtr++, 0, true, 1, 2, nullptr, nullptr, false, false, + blobs, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[0], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[1], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[2], false, false, 0, state); + + VerifiedGet(*tenv.Env, 1, groupId, blobs[20], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[30], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[31], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[40], false, false, 0, state); + + VerifiedCollectGarbage(*tenv.Env, 1, groupId, tabletId, gen, perGenCtr++, 0, true, 1, 1, nullptr, nullptr, false, false, blobs, state); + + { + TBlobInfo blob(tenv.DataGen(100), tabletId, 99, 1, 1, 0); + VerifiedPut(*tenv.Env, 1, groupId, blob, state); + blobs.push_back(blob); + } + + VerifiedCollectGarbage(*tenv.Env, 1, groupId, tabletId, gen, perGenCtr++, 0, true, 1, 3, nullptr, nullptr, false, true, + blobs, state); + + { + TBlobInfo blob(tenv.DataGen(100), tabletId, 99, 1, 3, 0); + VerifiedPut(*tenv.Env, 1, groupId, blob, state); + blobs.push_back(blob); + } + VerifiedRange(*tenv.Env, 1, groupId, tabletId, blobs[1].Id, blobs[1].Id, false, false, blobs, state); + + VerifiedGet(*tenv.Env, 1, groupId, blobs[1], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[2], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[3], false, false, 0, state); + + VerifiedGet(*tenv.Env, 1, groupId, blobs[20], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[30], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[31], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[40], false, false, 0, state); + + VerifiedCollectGarbage(*tenv.Env, 1, groupId, tabletId, gen, perGenCtr++, 0, true, 1, 1, nullptr, nullptr, false, true, blobs, state); + + VerifiedCollectGarbage(*tenv.Env, 1, groupId, tabletId, gen, perGenCtr++, 0, false, 1, 5, + new TVector<TLogoBlobID>({blobs[4].Id, blobs[5].Id}), + nullptr, + false, false, + blobs, state); + + VerifiedGet(*tenv.Env, 1, groupId, blobs[4], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[5], false, false, 0, state); + + VerifiedCollectGarbage(*tenv.Env, 1, groupId, tabletId, gen, perGenCtr++, 0, false, 1, 6, + nullptr, + new TVector<TLogoBlobID>({blobs[4].Id, blobs[5].Id}), + false, false, + blobs, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[4], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[5], false, false, 0, state); + + + VerifiedCollectGarbage(*tenv.Env, 1, groupId, tabletId, gen, perGenCtr++, 0, true, 1, 15, nullptr, nullptr, false, true, blobs, state); + + VerifiedRange(*tenv.Env, 1, groupId, tabletId, blobs[10].Id, blobs[19].Id, false, false, blobs, state); + + gen++; + perGenCtr = 1; + VerifiedCollectGarbage(*tenv.Env, 1, groupId, tabletId, gen + 1, perGenCtr++, 0, true, 2, 1, nullptr, nullptr, false, false, blobs, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[18], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[19], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[20], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[21], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[30], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[31], false, false, 0, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[40], false, false, 0, state); + + VerifiedCollectGarbage(*tenv.Env, 1, groupId, tabletId, 6, 1, 0, true, 2, 1, nullptr, nullptr, false, false, blobs, state); + + VerifiedRange(*tenv.Env, 1, groupId, tabletId, blobs[0].Id, blobs[39].Id, false, false, blobs, state); + VerifiedRange(*tenv.Env, 1, groupId, tablet2, blobs[40].Id, blobs[49].Id, false, false, blobs, state); + + VerifiedCollectGarbage(*tenv.Env, 1, groupId, tabletId, 7, 2, 0, true, 3, 1, nullptr, nullptr, false, true, blobs, state); + + VerifiedRange(*tenv.Env, 1, groupId, tabletId, blobs[0].Id, blobs[39].Id, false, false, blobs, state); + + VerifiedBlock(*tenv.Env, 1, groupId, tabletId, 10, state); + VerifiedCollectGarbage(*tenv.Env, 1, groupId, tabletId, 7, 1, 0, true, 100, 1, nullptr, nullptr, false, true, blobs, state); + VerifiedGet(*tenv.Env, 1, groupId, blobs[39], false, false, 0, state); +} + +void TestRestoreGet(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId, ui32 blobsNum, std::vector<TActorId>* vdisks) { + std::vector<TBlobInfo> blobs; + TBSState state; + state[tabletId]; + + std::vector<TActorId> allVdisks = *vdisks; + std::mt19937 g; + std::shuffle(allVdisks.begin(), allVdisks.end(), g); + + std::vector<TActorId> brokenVdisks = { allVdisks[0], allVdisks[1], allVdisks[2], allVdisks[3] }; + auto blockedEventType = TEvBlobStorage::TEvVPut::EventType; + tenv.Env->Runtime->FilterFunction = [&](ui32 /*nodeId*/, std::unique_ptr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == blockedEventType) { + for (auto vdisk : brokenVdisks) { + if (ev->Recipient == vdisk) { + return false; + } + } + } + return true; + }; + + for (ui32 i = 0; i < blobsNum; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1 + i, 1, 1, 0)); + } + + for (ui32 i = 0; i < blobsNum; ++i) { + VerifiedPut(*tenv.Env, 1, groupId, blobs[i], state); + } + + brokenVdisks = { allVdisks[0], allVdisks[1] }; + blockedEventType = TEvBlobStorage::TEvVGet::EventType; + + for (ui32 i = 0; i < blobsNum; ++i) { + VerifiedGet(*tenv.Env, 1, groupId, blobs[i], true, false, 0, state, false); + } + + blockedEventType = TEvBlobStorage::TEvVGet::EventType; + brokenVdisks = { allVdisks[4], allVdisks[5] }; + + for (ui32 i = 0; i < blobsNum; ++i) { + if (blobs[i].Status == TBlobInfo::EStatus::WRITTEN) { + VerifiedGet(*tenv.Env, 1, groupId, blobs[i], false, false, 0, state, false); + } + } +} + +void TestRestoreDiscover(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId, ui32 blobsNum, std::vector<TActorId>* vdisks) { + std::vector<TBlobInfo> blobs; + TBSState state; + state[tabletId]; + + std::vector<TActorId> allVdisks = *vdisks; + std::mt19937 g; + std::shuffle(allVdisks.begin(), allVdisks.end(), g); + + std::vector<TActorId> brokenVdisks = { allVdisks[0], allVdisks[1], allVdisks[2], allVdisks[3] }; + auto blockedEventType = TEvBlobStorage::TEvVPut::EventType; + tenv.Env->Runtime->FilterFunction = [&](ui32 /*nodeId*/, std::unique_ptr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == blockedEventType) { + for (auto vdisk : brokenVdisks) { + if (ev->Recipient == vdisk) { + return false; + } + } + } + return true; + }; + + for (ui32 i = 0; i < blobsNum; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, 1, 1 + i, 0)); + brokenVdisks = { allVdisks[0], allVdisks[1], allVdisks[2], allVdisks[3] }; + blockedEventType = TEvBlobStorage::TEvVPut::EventType; + VerifiedPut(*tenv.Env, 1, groupId, blobs[i], state); + brokenVdisks = { allVdisks[0], allVdisks[1] }; + blockedEventType = TEvBlobStorage::TEvVGet::EventType; + VerifiedDiscover(*tenv.Env, 1, groupId, tabletId, 0, true, false, 0, false, blobs, state, false); + } + + for (ui32 i = blobsNum; i < 2 * blobsNum; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, 1, 1, 1 + i, 0)); + brokenVdisks = { allVdisks[0], allVdisks[1], allVdisks[2], allVdisks[3] }; + blockedEventType = TEvBlobStorage::TEvVPut::EventType; + VerifiedPut(*tenv.Env, 1, groupId, blobs[i], state); + brokenVdisks = { allVdisks[0], allVdisks[1] }; + blockedEventType = TEvBlobStorage::TEvVGet::EventType; + VerifiedDiscover(*tenv.Env, 1, groupId, tabletId, 0, false, false, 0, false, blobs, state, false); + } + + blockedEventType = TEvBlobStorage::TEvVGet::EventType; + brokenVdisks = { allVdisks[4], allVdisks[5] }; + + for (ui32 i = 0; i < blobsNum * 2; ++i) { + if (blobs[i].Status == TBlobInfo::EStatus::WRITTEN) { + VerifiedGet(*tenv.Env, 1, groupId, blobs[i], false, false, 0, state, false); + } + } +} + +void TestRestoreRange(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId, ui32 blobsNum, std::vector<TActorId>* vdisks) { + std::vector<TBlobInfo> blobs; + TBSState state; + state[tabletId]; + + std::vector<TActorId> allVdisks = *vdisks; + std::mt19937 g; + std::shuffle(allVdisks.begin(), allVdisks.end(), g); + + std::vector<TActorId> brokenVdisks = { allVdisks[0], allVdisks[1], allVdisks[2], allVdisks[3] }; + auto blockedEventType = TEvBlobStorage::TEvVPut::EventType; + tenv.Env->Runtime->FilterFunction = [&](ui32 /*nodeId*/, std::unique_ptr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == blockedEventType) { + for (auto vdisk : brokenVdisks) { + if (ev->Recipient == vdisk) { + return false; + } + } + } + return true; + }; + + for (ui32 i = 0; i < blobsNum; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, tenv.Rand(NKikimr::TLogoBlobID::MaxCookie), 1, 1 + i, 0)); + VerifiedPut(*tenv.Env, 1, groupId, blobs[i], state); + } + + for (ui32 i = blobsNum; i < 2 * blobsNum; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(100), tabletId, tenv.Rand(NKikimr::TLogoBlobID::MaxCookie), 1, 1 + i, 0)); + VerifiedPut(*tenv.Env, 1, groupId, blobs[i], state); + } + + blockedEventType = TEvBlobStorage::TEvVGet::EventType; + brokenVdisks = { allVdisks[0], allVdisks[1] }; + + VerifiedRange(*tenv.Env, 1, groupId, tabletId, blobs[0].Id, blobs[blobsNum - 1].Id, true, false, blobs, state, false); + VerifiedRange(*tenv.Env, 1, groupId, tabletId, blobs[blobsNum].Id, blobs[2 * blobsNum - 1].Id, true, true, blobs, state, false); + + blockedEventType = TEvBlobStorage::TEvVGet::EventType; + brokenVdisks = { allVdisks[4], allVdisks[5] }; + + for (ui32 i = 0; i < 2 * blobsNum; ++i) { + if (blobs[i].Status == TBlobInfo::EStatus::WRITTEN) { + VerifiedGet(*tenv.Env, 1, groupId, blobs[i], false, false, 0, state, false); + } + } +} + +void TestVerifiedRandom(TBlobDepotTestEnvironment& tenv, ui32 nodeCount, ui64 tabletId0, ui32 groupId, ui32 iterationsNum, ui32 decommitStep, std::vector<ui32> probabilities) { + enum EActions { + ALTER = 0, + PUT, + GET, + MULTIGET, + RANGE, + BLOCK, + DISCOVER, + COLLECT_GARBAGE_HARD, + COLLECT_GARBAGE_SOFT, + RESTART_BLOB_DEPOT, + }; + std::vector<ui32> probs = probabilities; + TIntervals act(probs); + + std::vector<ui64> tablets = {tabletId0, tabletId0 + 1, tabletId0 + 2}; + std::vector<ui32> tabletGen = {1, 1, 1}; + std::vector<ui32> tabletStep = {1, 1, 1}; + std::vector<ui32> channels = {0, 1, 2}; + + std::vector<TBlobInfo> blobs; + + blobs.push_back(TBlobInfo("junk", 999, 999, 1, 1, 0)); + + TBSState state; + for (ui32 i = 0; i < tablets.size(); ++i) { + state[tablets[i]]; + } + + ui32 perGenCtr = 0; + + for (ui32 iteration = 0; iteration < iterationsNum; ++iteration) { + if (iteration == decommitStep) { + DecommitGroup(tenv, groupId); + continue; + } + if (tenv.IsFinished()) { + break; + } + ui32 tablet = tenv.Rand(tablets.size()); + ui32 tabletId = tablets[tablet]; + ui32 channel = tenv.Rand(channels); + ui32& gen = tabletGen[tablet]; + ui32& step = tabletStep[tablet]; + ui32 node = tenv.Rand(1, nodeCount); + + ui32 softCollectGen = state[tabletId].Channels[channel].SoftCollectGen; + ui32 softCollectStep = state[tabletId].Channels[channel].SoftCollectStep; + ui32 hardCollectGen = state[tabletId].Channels[channel].HardCollectGen; + ui32 hardCollectStep = state[tabletId].Channels[channel].HardCollectStep; + + ui32 action = act.GetInterval(tenv.Rand(act.UpperLimit())); + // Cerr << action << Endl; + switch (action) { + case EActions::ALTER: + { + if (tenv.Rand(3) == 0) { + gen += tenv.Rand(1, 2); + perGenCtr = 0; + } else { + step += tenv.Rand(1, 2); + } + } + break; + + case EActions::PUT: + { + ui32 cookie = tenv.Rand(NKikimr::TLogoBlobID::MaxCookie); + TBlobInfo blob(tenv.DataGen(tenv.Rand(50, 1000)), tabletId, cookie, gen, step, channel); + VerifiedPut(*tenv.Env, node, groupId, blob, state); + blobs.push_back(blob); + } + break; + + case EActions::GET: + { + TBlobInfo& blob = tenv.Rand(blobs); + bool mustRestoreFirst = false; + bool indexOnly = tenv.Rand(2); + ui32 forceBlockedGeneration = 0; + VerifiedGet(*tenv.Env, node, groupId, blob, mustRestoreFirst, indexOnly, forceBlockedGeneration, state); + } + break; + + case EActions::MULTIGET: + { + std::vector<TBlobInfo> getBlobs; + ui32 requestSize = tenv.Rand(50, 100); + for (ui32 i = 0; i < blobs.size() && i < requestSize; ++i) { + TBlobInfo& blob = tenv.Rand(blobs); + if (blob.Id.TabletID() == tabletId) { + getBlobs.push_back(blob); + } + } + + if (getBlobs.empty()) { + getBlobs.push_back(blobs[0]); + } + + bool mustRestoreFirst = false; + bool indexOnly = tenv.Rand(2); + ui32 forceBlockedGeneration = 0; + VerifiedGet(*tenv.Env, node, groupId, getBlobs, mustRestoreFirst, indexOnly, forceBlockedGeneration, state); + } + break; + + case EActions::RANGE: + { + TLogoBlobID r1 = tenv.Rand(blobs).Id; + TLogoBlobID r2 = tenv.Rand(blobs).Id; + + TLogoBlobID from(tabletId, r1.Generation(), r1.Step(), r1.Channel(), r1.BlobSize(), r1.Cookie()); + TLogoBlobID to(tabletId, r2.Generation(), r2.Step(), r2.Channel(), r2.BlobSize(), r2.Cookie()); + + if (from > to) { + std::swap(from, to); + } + + bool mustRestoreFirst = false; + bool indexOnly = tenv.Rand(2); + VerifiedRange(*tenv.Env, node, groupId, tabletId, from, to, mustRestoreFirst, indexOnly, blobs, state); + } + break; + + case EActions::BLOCK: + { + ui32 prevBlockedGen = state[tabletId].BlockedGen; + ui32 tryBlock = prevBlockedGen + tenv.Rand(4); + if (tryBlock > 0) { + tryBlock -= 1; + } + + VerifiedBlock(*tenv.Env, node, groupId, tabletId, tryBlock, state); + } + break; + + + case EActions::DISCOVER: + { + ui32 minGeneration = tenv.Rand(0, gen + 2); + bool readBody = tenv.Rand(2); + bool discoverBlockedGeneration = tenv.Rand(2); + ui32 forceBlockedGeneration = 0; + bool fromLeader = tenv.Rand(2); + + VerifiedDiscover(*tenv.Env, node, groupId, tabletId, minGeneration, readBody, discoverBlockedGeneration, forceBlockedGeneration, + fromLeader, blobs, state); + } + break; + + case EActions::COLLECT_GARBAGE_HARD: + { + ui32 tryGen = hardCollectGen + tenv.Rand(2); + ui32 tryStep = 0; + if (tryGen > 0 && !tenv.Rand(3)) { tryGen -= 1; } + if (tryGen > hardCollectGen) { + tryStep = tenv.Rand(hardCollectStep / 2); + } else { + tryStep = hardCollectStep + tenv.Rand(2); + if (tryStep > 0 && !tenv.Rand(3)) { tryStep -= 1; } + } + + bool collect = tenv.Rand(2); + bool isMultiCollectAllowed = tenv.Rand(2); + + THolder<TVector<TLogoBlobID>> keep(new TVector<TLogoBlobID>()); + THolder<TVector<TLogoBlobID>> doNotKeep(new TVector<TLogoBlobID>()); + + for (auto& blob : blobs) { + if (blob.Status == TBlobInfo::EStatus::WRITTEN) { + if (!tenv.Rand(5)) { + keep->push_back(blob.Id); + } else if (tenv.Rand(2)) { + doNotKeep->push_back(blob.Id); + } + } + } + + if (keep->size() == 0 && doNotKeep->size() == 0) { + collect = true; + } + + VerifiedCollectGarbage(*tenv.Env, node, groupId, tabletId, gen, perGenCtr++, channel, collect, + tryGen, tryStep, keep.Release(), doNotKeep.Release(), isMultiCollectAllowed, true, blobs, state); + } + break; + + case EActions::COLLECT_GARBAGE_SOFT: + { + ui32 tryGen = softCollectGen + tenv.Rand(2); + ui32 tryStep = 0; + if (tryGen > 0 && !tenv.Rand(3)) { tryGen -= 1; } + if (tryGen > softCollectGen) { + tryStep = tenv.Rand(softCollectStep / 2); + } else { + tryStep = softCollectStep + tenv.Rand(2); + if (tryStep > 0 && !tenv.Rand(3)) { tryStep -= 1; } + } + + bool collect = tenv.Rand(2); + bool isMultiCollectAllowed = tenv.Rand(2); + + THolder<TVector<TLogoBlobID>> keep(new TVector<TLogoBlobID>()); + THolder<TVector<TLogoBlobID>> doNotKeep(new TVector<TLogoBlobID>()); + + for (auto& blob : blobs) { + if (blob.Status == TBlobInfo::EStatus::WRITTEN) { + if (!tenv.Rand(5)) { + keep->push_back(blob.Id); + } else if (tenv.Rand(2)) { + doNotKeep->push_back(blob.Id); + } + } + } + + if (keep->size() == 0 && doNotKeep->size() == 0) { + collect = true; + } + + VerifiedCollectGarbage(*tenv.Env, node, groupId, tabletId, gen, perGenCtr++, channel, collect, + tryGen, tryStep, keep.Release(), doNotKeep.Release(), isMultiCollectAllowed, false, blobs, state); + } + break; + case EActions::RESTART_BLOB_DEPOT: + if (tenv.BlobDepotTabletId) { + auto edge = tenv.Env->Runtime->AllocateEdgeActor(node); + tenv.Env->Runtime->WrapInActorContext(edge, [&] { + TActivationContext::Register(CreateTabletKiller(tenv.BlobDepotTabletId)); + }); + tenv.Env->Runtime->DestroyActor(edge); + } + break; + + default: + UNIT_FAIL("TIntervals failed"); + } + } +} + +void TestLoadPutAndGet(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId, ui32 blobsNum, ui32 maxBlobSize, ui32 readsNum, bool decommit, std::vector<ui32> probablities) { + enum EActions { + GET, + MULTIGET, + RANGE, + DISCOVER, + CATCH_ALL, + RESTART_BLOB_DEPOT, + }; + std::vector<ui32> probs = probablities; + TIntervals act(probs); + + std::vector<TBlobInfo> blobs; + std::map<TLogoBlobID, TBlobInfo*> mappedBlobs; + TBSState state; + state[tabletId]; + + TActorId edge = tenv.Env->Runtime->AllocateEdgeActor(1); + + blobs.reserve(blobsNum); + + std::map<ui64, std::shared_ptr<TEvArgs>> requests; + + ui32 getCtr = 0; + ui32 rangeCtr = 0; + ui32 discoverCtr = 0; + + for (ui32 i = 0; i < blobsNum; ++i) { + blobs.push_back(TBlobInfo(tenv.DataGen(tenv.Rand(1, maxBlobSize)), tabletId, tenv.Rand(NKikimr::TLogoBlobID::MaxCookie), 1, 1, 0)); + mappedBlobs[blobs[i].Id] = &blobs[i]; + } + + for (ui32 i = 0; i < blobsNum; ++i) { + SendTEvPut(*tenv.Env, edge, groupId, blobs[i].Id, blobs[i].Data); + } + + for (ui32 i = 0; i < blobsNum; ++i) { + auto res = CaptureTEvPutResult(*tenv.Env, edge, false); + UNIT_ASSERT_C(res->Get(), TStringBuilder() << "Fail on iteration# " << i); + auto it = mappedBlobs.find(res->Get()->Id); + if (it == mappedBlobs.end()) { + UNIT_FAIL("Put nonexistent blob"); + } + VerifyTEvPutResult(res.Release(), *it->second, state); + } + + if (decommit) { + DecommitGroup(tenv, groupId); + } + + for (ui32 iteration = 0; iteration < readsNum; ++iteration) { + ui32 action = act.GetInterval(tenv.Rand(act.UpperLimit())); + if (iteration == readsNum - 1) { // Catch all results on the last iteration + action = 4; + } + if (tenv.IsFinished()) { + break; + } + + ui64 cookie = tenv.Rand64(); + // Cerr << action << Endl; + switch (action) { + case EActions::GET: + { + ui32 blobNum = tenv.Rand(1 , blobsNum); + bool mustRestoreFirst = tenv.Rand(2); + bool indexOnly = tenv.Rand(2); + ui32 forceBlockedGeneration = 0; + SendTEvGet(*tenv.Env, edge, groupId, blobs[blobNum].Id, mustRestoreFirst, indexOnly, forceBlockedGeneration, cookie); + getCtr++; + requests[cookie] = std::make_shared<TEvGetArgs>(mustRestoreFirst, indexOnly, forceBlockedGeneration); + } + break; + + case EActions::MULTIGET: + { + ui32 blobsInRequest = tenv.Rand(1, 33); + std::vector<TBlobInfo> request; + for (ui32 i = 0; i < blobsInRequest; ++i) { + request.push_back(tenv.Rand(blobs)); + } + + bool mustRestoreFirst = tenv.Rand(2); + bool indexOnly = tenv.Rand(2); + ui32 forceBlockedGeneration = 0; + SendTEvGet(*tenv.Env, edge, groupId, request, mustRestoreFirst, indexOnly, forceBlockedGeneration, cookie); + getCtr++; + requests[cookie] = std::make_shared<TEvGetArgs>(mustRestoreFirst, indexOnly, forceBlockedGeneration); + } + break; + + case EActions::RANGE: + { + TLogoBlobID from = tenv.Rand(blobs).Id; + TLogoBlobID to = tenv.Rand(blobs).Id; + if (from > to) { + std::swap(from, to); + } + + bool mustRestoreFirst = false; + bool indexOnly = tenv.Rand(2); + SendTEvRange(*tenv.Env, edge, groupId, tabletId, from, to, mustRestoreFirst, indexOnly, cookie); + rangeCtr++; + requests[cookie] = std::make_shared<TEvRangeArgs>(mustRestoreFirst, indexOnly); + } + break; + + case EActions::DISCOVER: + { + ui32 minGeneration = 0; + bool readBody = tenv.Rand(2); + bool discoverBlockedGeneration = tenv.Rand(2); + ui32 forceBlockedGeneration = 0; + bool fromLeader = tenv.Rand(2); + + SendTEvDiscover(*tenv.Env, edge, groupId, tabletId, minGeneration, readBody, discoverBlockedGeneration, forceBlockedGeneration, + fromLeader, cookie); + discoverCtr++; + requests[cookie] = std::make_shared<TEvDiscoverArgs>(minGeneration, readBody, discoverBlockedGeneration, forceBlockedGeneration, fromLeader); + } + break; + + case EActions::CATCH_ALL: + { + // Cerr << getCtr << ' ' << rangeCtr << ' ' << discoverCtr << Endl; + while (getCtr + rangeCtr + discoverCtr) { + auto ev = CaptureAnyResult(*tenv.Env, edge); + UNIT_ASSERT_C(ev, TStringBuilder() << "Event lost, expected " << getCtr << " TEvGetResult's, " << rangeCtr << " TEvRangeResult's, " << discoverCtr << " TEvDiscoverResult's"); + switch (ev->GetTypeRewrite()) { + case TEvBlobStorage::TEvGetResult::EventType: + { + std::unique_ptr<TEventHandle<TEvBlobStorage::TEvGetResult>> res(reinterpret_cast<TEventHandle<TEvBlobStorage::TEvGetResult>*>(ev.release())); + UNIT_ASSERT(res); + std::vector<TBlobInfo> response; + ui32 responseSz = res->Get()->ResponseSz; + for (ui32 i = 0; i < responseSz; ++i) { + response.push_back(*mappedBlobs[res->Get()->Responses[i].Id]); + } + TEvGetArgs args = *requests[res->Cookie]->Get<TEvGetArgs>(); + VerifyTEvGetResult(res.release(), response, args.MustRestoreFirst, args.IndexOnly, args.ForceBlockedGeneration, state); + } + --getCtr; + break; + case TEvBlobStorage::TEvRangeResult::EventType: + { + std::unique_ptr<TEventHandle<TEvBlobStorage::TEvRangeResult>> res(reinterpret_cast<TEventHandle<TEvBlobStorage::TEvRangeResult>*>(ev.release())); + UNIT_ASSERT(res); + TLogoBlobID from = res->Get()->From; + TLogoBlobID to = res->Get()->To; + TEvRangeArgs args = *requests[res->Cookie]->Get<TEvRangeArgs>(); + VerifyTEvRangeResult(res.release(), tabletId, from, to, args.MustRestoreFirst, args.IndexOnly, blobs, state); + } + --rangeCtr; + break; + case TEvBlobStorage::TEvDiscoverResult::EventType: + { + std::unique_ptr<TEventHandle<TEvBlobStorage::TEvDiscoverResult>> res(reinterpret_cast<TEventHandle<TEvBlobStorage::TEvDiscoverResult>*>(ev.release())); + UNIT_ASSERT(res); + UNIT_ASSERT(res->Get()); + TEvDiscoverArgs args = *requests[res->Cookie]->Get<TEvDiscoverArgs>(); + VerifyTEvDiscoverResult(res.release(), tabletId, args.MinGeneration, args.ReadBody, args.DiscoverBlockedGeneration, + args.ForceBlockedGeneration, args.FromLeader, blobs, state); + } + --discoverCtr; + break; + } + } + } + break; + + case EActions::RESTART_BLOB_DEPOT: + // Cerr << "RESTART" << Endl; + if (tenv.BlobDepotTabletId) { + tenv.Env->Runtime->WrapInActorContext(edge, [&] { + TActivationContext::Register(CreateTabletKiller(tenv.BlobDepotTabletId)); + }); + } + break; + + default: + UNIT_FAIL("TIntervals failed"); + } + } +}
\ No newline at end of file diff --git a/ydb/core/blobstorage/ut_blobstorage/blob_depot_test_functions.h b/ydb/core/blobstorage/ut_blobstorage/blob_depot_test_functions.h new file mode 100644 index 00000000000..5ff4f4593e4 --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/blob_depot_test_functions.h @@ -0,0 +1,115 @@ +#pragma once + +#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> +#include <ydb/core/blob_depot/events.h> + +#include <util/random/mersenne.h> +#include <util/random/random.h> + +#include <algorithm> +#include <random> + +#include <blob_depot_event_managers.h> +#include <blob_depot_auxiliary_structures.h> + +void ConfigureEnvironment(ui32 numGroups, std::unique_ptr<TEnvironmentSetup>& envPtr, std::vector<ui32>& regularGroups, ui32& blobDepot, ui32 nodeCount = 8, + TBlobStorageGroupType erasure = TBlobStorageGroupType::ErasureMirror3of4); + + +struct TBlobDepotTestEnvironment { + TMersenne<ui32> Mt; + TMersenne<ui64> Mt64; + + std::unique_ptr<TEnvironmentSetup> Env; + std::vector<ui32> RegularGroups; + ui32 BlobDepot; + ui32 BlobDepotTabletId; + THPTimer Timer; + ui32 TimeLimit; + TBlobDepotTestEnvironment(ui32 seed = 0, ui32 numGroups = 1, ui32 nodeCount = 8, TBlobStorageGroupType erasure = TBlobStorageGroupType::ErasureMirror3of4, ui32 timeLimit = 0) + : Mt(seed) + , Mt64(seed) + , TimeLimit(timeLimit) { + Cerr << "Mersenne random seed " << seed << Endl; + ConfigureEnvironment(numGroups, Env, RegularGroups, BlobDepot, nodeCount, erasure); + BlobDepotTabletId = 0; + } + + TString DataGen(ui32 len) { + TString res = ""; + for (ui32 i = 0; i < len; ++i) { + res += 'A' + Mt.GenRand() % ('z' - 'A'); + } + return res; + } + + ui32 Rand(ui32 a, ui32 b) { + if (a >= b) { + return a; + } + return Mt.GenRand() % (b - a) + a; + } + + ui32 Rand(ui32 b) { + return Rand(0, b); + } + + ui32 Rand() { + return Mt.GenRand(); + } + + ui32 Rand64() { + return Mt64.GenRand(); + } + + template <class T> + T& Rand(std::vector<T>& v) { + return v[Rand(v.size())]; + } + + ui32 SeedRand(ui32 a, ui32 b, ui32 seed) { + TMersenne<ui32> temp(seed); + if (a >= b) { + return a; + } + return temp.GenRand() % (b - a) + a; + } + + ui32 SeedRand(ui32 b, ui32 seed) { + return SeedRand(0, b, seed); + } + + template <class T> + const T& Rand(const std::vector<T>& v) { + return v[Rand(v.size())]; + } + + bool IsFinished() { + return TimeLimit && Timer.Passed() > TimeLimit; + } +}; + +void DecommitGroup(TBlobDepotTestEnvironment& tenv, ui32 groupId); + +void TestBasicPutAndGet(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId); + +TLogoBlobID MinBlobID(ui64 tablet); +TLogoBlobID MaxBlobID(ui64 tablet); + +void TestBasicRange(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId); + +void TestBasicDiscover(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId); + +void TestBasicBlock(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId); + +void TestBasicCollectGarbage(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId); + +void TestRestoreGet(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId, ui32 blobsNum, std::vector<TActorId>* vdisks); + +void TestRestoreDiscover(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId, ui32 blobsNum, std::vector<TActorId>* vdisks); + +void TestRestoreRange(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId, ui32 blobsNum, std::vector<TActorId>* vdisks); + +void TestVerifiedRandom(TBlobDepotTestEnvironment& tenv, ui32 nodeCount, ui64 tabletId0, ui32 groupId, ui32 iterationsNum, ui32 decommitStep = 1e9, std::vector<ui32> probabilities = { 10, 10, 3, 3, 2, 1, 1, 3, 3, 1 }); + +void TestLoadPutAndGet(TBlobDepotTestEnvironment& tenv, ui64 tabletId, ui32 groupId, ui32 blobsNum, ui32 maxBlobSize, ui32 readsNum, bool decommit = false, std::vector<ui32> probabilities = { 5, 1, 5, 5, 1, 1 });
\ No newline at end of file diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.darwin.txt b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.darwin.txt index bfb5afe045b..ab26eed4f31 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.darwin.txt +++ b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.darwin.txt @@ -28,6 +28,7 @@ target_link_options(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE ) target_sources(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot_test_functions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp ) add_test( diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.linux.txt b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.linux.txt index dd689949ff5..b225a4afe50 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.linux.txt +++ b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.linux.txt @@ -32,6 +32,7 @@ target_link_options(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE ) target_sources(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot_test_functions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp ) add_test( diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot_fat/CMakeLists.darwin.txt b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot_fat/CMakeLists.darwin.txt new file mode 100644 index 00000000000..37e6b7bc0e1 --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot_fat/CMakeLists.darwin.txt @@ -0,0 +1,45 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(blobstorage-ut_blobstorage-ut_blob_depot_fat) +target_include_directories(blobstorage-ut_blobstorage-ut_blob_depot_fat PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage +) +target_link_libraries(blobstorage-ut_blobstorage-ut_blob_depot_fat PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + blobstorage-ut_blobstorage-lib +) +target_link_options(blobstorage-ut_blobstorage-ut_blob_depot_fat PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(blobstorage-ut_blobstorage-ut_blob_depot_fat PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot_fat.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot_test_functions.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp +) +add_test( + NAME + blobstorage-ut_blobstorage-ut_blob_depot_fat + COMMAND + blobstorage-ut_blobstorage-ut_blob_depot_fat + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(blobstorage-ut_blobstorage-ut_blob_depot_fat) diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot_fat/CMakeLists.linux.txt b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot_fat/CMakeLists.linux.txt new file mode 100644 index 00000000000..0eec4e7c37c --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot_fat/CMakeLists.linux.txt @@ -0,0 +1,49 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(blobstorage-ut_blobstorage-ut_blob_depot_fat) +target_include_directories(blobstorage-ut_blobstorage-ut_blob_depot_fat PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage +) +target_link_libraries(blobstorage-ut_blobstorage-ut_blob_depot_fat PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + blobstorage-ut_blobstorage-lib +) +target_link_options(blobstorage-ut_blobstorage-ut_blob_depot_fat PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(blobstorage-ut_blobstorage-ut_blob_depot_fat PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot_fat.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot_test_functions.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot_event_managers.cpp +) +add_test( + NAME + blobstorage-ut_blobstorage-ut_blob_depot_fat + COMMAND + blobstorage-ut_blobstorage-ut_blob_depot_fat + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(blobstorage-ut_blobstorage-ut_blob_depot_fat) diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot_fat/CMakeLists.txt b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot_fat/CMakeLists.txt new file mode 100644 index 00000000000..fc7b1ee73ce --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot_fat/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX AND NOT APPLE) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp index 046d21b29c7..4157c7af6e5 100644 --- a/ydb/core/kqp/ut/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp @@ -3329,6 +3329,24 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { )").ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } + + Y_UNIT_TEST(PushFlatmapInnerConnectionsToStageInput) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + PRAGMA kikimr.UseNewEngine = "true"; + $subquery = SELECT Key FROM `/Root/KeyValue`; + $subquery2 = SELECT Amount FROM `/Root/Test`; + + SELECT * FROM `/Root/EightShard` + WHERE Key IN $subquery OR Key == 101 OR Key IN $subquery2; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[1];[101u];["Value1"]]])", FormatResultSetYson(result.GetResultSet(0))); + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/protos/msgbus.proto b/ydb/core/protos/msgbus.proto index 0abb62ca11e..fcf4b60b35a 100644 --- a/ydb/core/protos/msgbus.proto +++ b/ydb/core/protos/msgbus.proto @@ -776,8 +776,10 @@ message TTestShardControlRequest { optional uint64 MaxDataBytes = 3; // when total length of stored keys reaches MaxDataBytes... optional uint64 MinDataBytes = 4; // then random keys are collected until total length drops below MinDataBytes optional uint32 MaxInFlight = 5; + optional uint64 ValidateAfterBytes = 9; // number of bytes to write before starting automatic validation repeated TSizeInterval Sizes = 6; // distrubution of generated value size repeated TTimeInterval WritePeriods = 7; // time between two events + repeated TTimeInterval RestartPeriods = 8; // time between automatic restarts } optional uint64 TabletId = 1; diff --git a/ydb/core/test_tablet/load_actor_impl.cpp b/ydb/core/test_tablet/load_actor_impl.cpp index 9e28a3f3476..5556dad5e28 100644 --- a/ydb/core/test_tablet/load_actor_impl.cpp +++ b/ydb/core/test_tablet/load_actor_impl.cpp @@ -2,9 +2,11 @@ namespace NKikimr::NTestShard { - TLoadActor::TLoadActor(ui64 tabletId, ui32 generation, const NKikimrClient::TTestShardControlRequest::TCmdInitialize& settings) + TLoadActor::TLoadActor(ui64 tabletId, ui32 generation, TActorId tablet, + const NKikimrClient::TTestShardControlRequest::TCmdInitialize& settings) : TabletId(tabletId) , Generation(generation) + , Tablet(tablet) , Settings(settings) , StateServerWriteLatency(1024) , WriteLatency(1024) @@ -16,6 +18,10 @@ namespace NKikimr::NTestShard { Settings.GetStorageServerPort())); Send(parentId, new TTestShard::TEvSwitchMode(TTestShard::EMode::STATE_SERVER_CONNECT)); Become(&TThis::StateFunc); + if (Settings.RestartPeriodsSize()) { + TActivationContext::Schedule(GenerateRandomInterval(Settings.GetRestartPeriods()), new IEventHandle( + TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0)); + } } void TLoadActor::PassAway() { @@ -26,6 +32,11 @@ namespace NKikimr::NTestShard { TActorBootstrapped::PassAway(); } + void TLoadActor::HandleWakeup() { + STLOG(PRI_NOTICE, TEST_SHARD, TS00, "voluntary restart", (TabletId, TabletId)); + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, Tablet, TabletActorId, nullptr, 0)); + } + void TLoadActor::Action() { if (ValidationActorId) { // do nothing while validation is in progress return; @@ -37,7 +48,11 @@ namespace NKikimr::NTestShard { return; } } - if (BytesProcessed > 2 * Settings.GetMaxDataBytes()) { // time to perform validation + ui64 barrier = 2 * Settings.GetMaxDataBytes(); + if (Settings.HasValidateAfterBytes()) { + barrier = Settings.GetValidateAfterBytes(); + } + if (BytesProcessed > barrier) { // time to perform validation if (WritesInFlight.empty() && DeletesInFlight.empty() && TransitionInFlight.empty()) { RunValidation(false); } @@ -127,7 +142,7 @@ namespace NKikimr::NTestShard { void TTestShard::StartActivities() { if (!ActivityActorId && Settings) { - ActivityActorId = Register(new TLoadActor(TabletID(), Executor()->Generation(), *Settings), + ActivityActorId = Register(new TLoadActor(TabletID(), Executor()->Generation(), Tablet(), *Settings), TMailboxType::ReadAsFilled, AppData()->UserPoolId); } } diff --git a/ydb/core/test_tablet/load_actor_impl.h b/ydb/core/test_tablet/load_actor_impl.h index 71bd0e4f41e..a89e4b03bcf 100644 --- a/ydb/core/test_tablet/load_actor_impl.h +++ b/ydb/core/test_tablet/load_actor_impl.h @@ -11,6 +11,7 @@ namespace NKikimr::NTestShard { class TLoadActor : public TActorBootstrapped<TLoadActor> { const ui64 TabletId; const ui32 Generation; + const TActorId Tablet; TActorId TabletActorId; const NKikimrClient::TTestShardControlRequest::TCmdInitialize Settings; @@ -40,9 +41,11 @@ namespace NKikimr::NTestShard { }; public: - TLoadActor(ui64 tabletId, ui32 generation, const NKikimrClient::TTestShardControlRequest::TCmdInitialize& settings); + TLoadActor(ui64 tabletId, ui32 generation, const TActorId tablet, + const NKikimrClient::TTestShardControlRequest::TCmdInitialize& settings); void Bootstrap(const TActorId& parentId); void PassAway() override; + void HandleWakeup(); void Action(); void Handle(TEvStateServerStatus::TPtr ev); @@ -53,6 +56,7 @@ namespace NKikimr::NTestShard { hFunc(TEvStateServerWriteResult, Handle); hFunc(TEvValidationFinished, Handle); cFunc(TEvents::TSystem::Poison, PassAway); + cFunc(TEvents::TSystem::Wakeup, HandleWakeup); ) //////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/test_tablet/load_actor_read_validate.cpp b/ydb/core/test_tablet/load_actor_read_validate.cpp index 956c62e515a..b727144ac37 100644 --- a/ydb/core/test_tablet/load_actor_read_validate.cpp +++ b/ydb/core/test_tablet/load_actor_read_validate.cpp @@ -221,7 +221,7 @@ namespace NKikimr::NTestShard { auto& record = ev->Get()->Record; const NKikimrKeyValue::Statuses::ReplyStatus status = record.status(); - if (status != NKikimrKeyValue::Statuses::RSTATUS_TIMEOUT) { + if (status == NKikimrKeyValue::Statuses::RSTATUS_TIMEOUT) { STLOG(PRI_ERROR, TEST_SHARD, TS19, "CmdRangeRead failed", (TabletId, TabletId), (Status, status), (ErrorReason, record.msg())); return IssueNextReadRangeQuery(); diff --git a/ydb/docs/ru/core/best_practices/cdc.md b/ydb/docs/ru/core/best_practices/cdc.md index 9910a8b5f62..7d3febd62f0 100644 --- a/ydb/docs/ru/core/best_practices/cdc.md +++ b/ydb/docs/ru/core/best_practices/cdc.md @@ -50,7 +50,7 @@ CDC представлен объектом схемы данных — пото В реальных сценариях включение CDC практически не влияет время выполнения запросов (вне зависимости от режима), так как почти все данные, необходимые для формирования записей, находятся в кеше, а сами записи отправляются в топик асинхронно. Однако, фоновая активность по отправке записей незначительно (на 1–10%) увеличивает потребление CPU. -В дополнение к вышесказанному, в настоящий момент поток изменений помещается в хранилище (топик) с ограниченной эластичностью. Это означает, что при сильном изменении схемы партицирования таблицы возникает дисбаланс между партициями таблицы и партициями топика. Такой дисбаланс также может приводить к увеличению времени исполнения запросов или дополнительным накладным расходам на хранение потока изменений. +При создании потока изменений для таблицы количество партиций его хранилища (топика) определяется на основании текущего количества партиций таблицы. Если количество партиций исходной таблицы значительно изменяется (например, после загрузки большого объёма данных, или в результате интенсивных обращений), то возникает дисбаланс между партициями таблицы и партициями топика. Такой дисбаланс также может приводить к увеличению времени исполнения запросов на модификацию данных в таблице, либо к излишним накладным расходам на хранение потока изменений. Для устранения дисбаланса можно пересоздать поток изменений. ## Нагрузочное тестирование {#workload} diff --git a/ydb/docs/ru/core/cluster/audit-logs.md b/ydb/docs/ru/core/cluster/audit-logs.md index f681ecce771..53f696e8768 100644 --- a/ydb/docs/ru/core/cluster/audit-logs.md +++ b/ydb/docs/ru/core/cluster/audit-logs.md @@ -1,8 +1,12 @@ # Аудитные логи -В аудитные логи пишется информация обо всех изменениях схемы (успешных и неуспешных) и обо всех изменениях ACL. Аудитные логи записывают в логи `SchemeShard` с пометкой `FLAT_TX_SCHEMESHARD NOTICE: AUDIT:` -### Пример логов -``` +Информация обо всех изменениях схемы (успешных и неуспешных), а также об изменениях ACL записывается в _аудитные логи_. + +## Формат аудитных логов {#format} + +Событие лога состоит из полей `ключ: значение`, разделенных запятыми: + +```text 2022-08-03T22:41:43.860439Z node 1 :FLAT_TX_SCHEMESHARD NOTICE: AUDIT: txId: 281474976710670, database: /Root, subject: no subject, status: StatusSuccess, operation: MODIFY ACL, path: Root, add access: +(CT):user0@builtin, protobuf request: WorkingDir: "" OperationType: ESchemeOpModifyACL ModifyACL { Name: "Root" DiffACL: "\n\031\010\000\022\025\010\001\020@\032\ruser0@builtin \003" } 2022-08-03T22:41:43.931561Z node 1 :FLAT_TX_SCHEMESHARD NOTICE: AUDIT: txId: 281474976710672, database: /Root, subject: user0@builtin, status: StatusAccepted, operation: DROP TABLE, path: /Root/Test1234/KeyValue, protobuf request: WorkingDir: "/Root/Test1234" OperationType: ESchemeOpDropTable Drop { Name: "KeyValue" } @@ -10,27 +14,29 @@ 2022-08-03T22:41:43.895591Z node 1 :FLAT_TX_SCHEMESHARD NOTICE: AUDIT: txId: 281474976710671, database: /Root, subject: user0@builtin, status: StatusAccepted, operation: CREATE DIRECTORY, path: /Root/Test1234, protobuf request: WorkingDir: "/Root" OperationType: ESchemeOpMkDir MkDir { Name: "Test1234" } FailOnExist: true, operation: CREATE TABLE, path: /Root/Test1234/KeyValue, protobuf request: WorkingDir: "/Root/Test1234" OperationType: ESchemeOpCreateTable CreateTable { Name: "KeyValue" Columns { Name: "Key" Type: "Uint32" NotNull: false } Columns { Name: "Value" Type: "String" NotNull: false } KeyColumnNames: "Key" PartitionConfig { ColumnFamilies { Id: 0 StorageConfig { SysLog { PreferredPoolKind: "test" } Log { PreferredPoolKind: "test" } Data { PreferredPoolKind: "test" } } } } } FailOnExist: false ``` -### Формат аудитных логов -Аудитные логи пишутся в формате пар `ключ: значение`. Каждая такая пара отделяется запятой. В каждой записи может быть описано несколько подопераций. Из-за этого часть полей в аудитных логах будут принадлежать всей транзакции - они пишутся один раз, другие поля принадлежат конкретным подоперациям и могут повторяться в зависимости от количества подопераций +Одно событие описывает одну транзакцию. В событии может быть описано несколько операций, выполненных в рамках одной транзакции. В этом случае часть полей будут описывать [события транзакции](#tx-fields), а часть полей — [события операций](#sub-operation-fields) внутри транзакции. + +### Поля транзакции {#tx-fields} + +* `txId` — (обязательно) уникальный идентификатор транзакции. +* `database` — (опционально) путь к базе данных. +* `subject` — (обязательно) SID источника события (формат `<login>@<subsystem>`). Если не определен, значение `no subject`. +* `status` — (обязательно) статус завершения транзакции. +* `reason` — (опционально) сообщение об ошибке. -##### Поля транзакции -- `txId` - *(required)* уникальный идентификатор транзакции -- `database` - *(optional)* путь к базе данных -- `subject` - *(required)* SID инициатора (формат `<login>@<subsystem>`). если не определен, значение `no subject` -- `status` - *(required)* статус выполнения транзакции. ниже [список](#statuses) всевозможных статусов -- `reason` - *(optional)* причина ошибки, если что-то пошло не так +### Поля операции {#sub-operation-fields} -##### Поля подоперации -- `operation` - *(required)* название операции. ниже [список](#names) всевозможных операций -- `path` - *(optional)* путь к объекту изменения -- `src path` - *(optional)* путь к исходному объекту. для операций копирования/перемещения -- `dst path` - *(optional)* путь к конечному объекту. для операций копирования/перемещения -- `no path` - *(optional)* если объект изменения отсутствует, в лог пишется статус `no path`. у этого поля нет значения -- `set owner` - *(optional)* новый владелец, при изменении ACL -- `add access` - *(repeated)* добавление доступа, при изменении ACL -- `remove access` - *(repeated)* удаление доступа, при изменении ACL -- `protobuf request` - *(optional)* протобуф на изменение схемы или ACL +* `operation` — (обязательно) название операции. +* `path` — (опционально) путь к объекту изменения. +* `src path` — (опционально) путь к исходному объекту (для операций копирования и перемещения). +* `dst path` — (опционально) путь к конечному объекту (для операций копирования и перемещения). +* `no path` — (опционально) если объект изменения отсутствует, содержит значение `no path`. +* `set owner` — (опционально) новый владелец при изменении ACL. +* `add access` — (опционально) добавление доступа при изменении ACL. Поле может повторятся. +* `remove access` — (опционально) удаление доступа при изменении ACL. Поле может повторяться. +* `protobuf request` — (опционально) Описание изменения схемы или ACL в формате protobuf. +<!-- ### <a name="statuses"></a>Список возможных статусов - StatusSuccess - StatusAccepted @@ -132,4 +138,4 @@ - CREATE BLOB DEPOT - ALTER BLOB DEPOT - DROP BLOB DEPOT -- ALTER TABLE INDEX RENAME +- ALTER TABLE INDEX RENAME --> diff --git a/ydb/docs/ru/core/cluster/toc_i.yaml b/ydb/docs/ru/core/cluster/toc_i.yaml index ed0c1f762ac..facfe02c096 100644 --- a/ydb/docs/ru/core/cluster/toc_i.yaml +++ b/ydb/docs/ru/core/cluster/toc_i.yaml @@ -9,6 +9,8 @@ items: include: { mode: link, path: ../maintenance/embedded_monitoring/toc_p.yaml } - name: Системные таблицы кластера href: ../troubleshooting/system_views_cluster.md +- name: Аудитные логи + href: audit-logs.md - name: Мониторинг items: - name: Настройка мониторинга локального кластера YDB diff --git a/ydb/docs/ru/core/concepts/topic.md b/ydb/docs/ru/core/concepts/topic.md index ed0912df3d1..284e2ff344f 100644 --- a/ydb/docs/ru/core/concepts/topic.md +++ b/ydb/docs/ru/core/concepts/topic.md @@ -31,7 +31,7 @@ Все сообщения внутри партиции имеют уникальный порядковый номер, называемый `офсетом`. Офсет монотонно возрастает при записи новых сообщений. -## Источники и группы сообщений{#producer-id} +## Источники и группы сообщений {#producer-id} Идентификатор источника, `producer_id`, и идентификатор группы сообщений, `message_group_id`, — это способ упорядочить набор сообщений. Порядок записанных сообщений сохраняется в пределах пар: <идентификатор источника, идентификатор группы сообщений>. @@ -110,11 +110,11 @@ Файл | Смещение передаваемых данных от начала в файле | Нельзя удалять строки из начала файла, так как это приведет или пропуску части данных, как к дублям, либо к потере части данных. Таблица базы данных | Автоинкрементный идентификатор записи | -## Время хранения сообщений { #retention-time } +## Время хранения сообщений {#retention-time} -Для каждого топика определено время хранения сообщений. После истечения времени хранения сообщения автоматически удаляются. Исключение составляют данные, которые еще не были прочитаны ["важным"](#important-consumer) читателем — они будут храниться до тех пор, пока читатель их не прочитает. +Для каждого топика определено время хранения сообщений. После истечения времени хранения сообщения автоматически удаляются. Исключение составляют данные, которые еще не были прочитаны ["важным"](#important-consumer) читателем — они будут храниться до тех пор, пока читатель их не прочитает. -## Сжатие данных { #message-codec } +## Сжатие данных {#message-codec} При передаче приложение-писатель указывает, что сообщение может быть сжато одним из поддерживаемых кодеков. Название кодека передается при записи и сохраняется вместе с сообщением, а также возвращается на чтении. Сжатие сообщений происходит по каждому сообщению в отдельности, сжатие пакета сообщений не поддерживается. Операции сжатия-разжатия данных производятся на стороне приложений-читателей и -писателей. @@ -129,15 +129,15 @@ {% endif %} `zstd` | Сжатие алгоритмом [zstd](https://en.wikipedia.org/wiki/Zstd). -## Читатель { #consumer } +## Читатель {#consumer} Читатель — это именованная сущность для чтения данных из топика. Читатель содержит позиции чтения, подтвержденные читателем по каждому топику, читаемого от его имени. -### Позиция чтения { #consumer-offset } +### Позиция чтения {#consumer-offset} Позиция чтения — это сохраненный [офсет](#offset) читателя по каждой партиции топика. Позиция чтения сохраняется читателем после отправки подтверждения прочитанных данных. При установке новой сессии чтения сообщения поступают читателю начиная с сохраненной позиции чтения. Это позволяет пользователям не хранить позицию чтения на своей стороне. -### Важный читатель { #important-consumer } +### Важный читатель {#important-consumer} Читатель может обладать признаком "важный". Наличие этого признака означает, что сообщения в топике не будут удаляться до тех пор, пока читатель не прочитает и не подтвердит сообщения. Этот признак можно устанавливать для самых критичных читателей, которые должны обработать все данные даже при длительном простое. diff --git a/ydb/docs/ru/core/faq/_includes/common.md b/ydb/docs/ru/core/faq/_includes/common.md index a277155b4bc..1c8f7cb3180 100644 --- a/ydb/docs/ru/core/faq/_includes/common.md +++ b/ydb/docs/ru/core/faq/_includes/common.md @@ -69,3 +69,7 @@ description: "Что такое YDB? Для каких задач стоит и� #### Как удалять устаревшие данные? {#ttl} Для эффективного удаления устаревших данных рекомендуется использовать [TTL](../../concepts/ttl.md). + +#### Как происходит синхронизация данных между дата-центрами в геораспределенных кластерах {#sinc-between-dc} + +[Таблетка](../../concepts/cluster/common_scheme_ydb.md#tablets)-лидер записывает данные в [распределенное сетевое хранилище](../../concepts/cluster/distributed_storage.md), которое сохраняет копии в нескольких дата-центрах. {{ ydb-short-name }} подтверждает пользователю запрос только после успешного сохранения нужного числа копий в нужном числе дата-центров. diff --git a/ydb/docs/ru/core/reference/ydb-sdk/toc_i.yaml b/ydb/docs/ru/core/reference/ydb-sdk/toc_i.yaml index 1b8a4e8d071..0a1faf4e265 100644 --- a/ydb/docs/ru/core/reference/ydb-sdk/toc_i.yaml +++ b/ydb/docs/ru/core/reference/ydb-sdk/toc_i.yaml @@ -5,8 +5,8 @@ items: href: install.md - name: Аутентификация href: auth.md -# - name: Работа с топиками -# href: topic/topic.md + - name: Работа с топиками + href: topic.md - name: Тестовое приложение include: { mode: link, path: example/toc_p.yaml } - name: Обработка ошибок в API diff --git a/ydb/docs/ru/core/reference/ydb-sdk/topic.md b/ydb/docs/ru/core/reference/ydb-sdk/topic.md new file mode 100644 index 00000000000..9cf237caf5f --- /dev/null +++ b/ydb/docs/ru/core/reference/ydb-sdk/topic.md @@ -0,0 +1,248 @@ +# Работа с топиками + +В этой статье приведены примеры использования {{ ydb-short-name }} SDK для работы с [топиками](../../concepts/topic.md). + +Перед выполнением примеров [создайте топик](../ydb-cli/topic.md#topic-create) и [добавьте читателя](../ydb-cli/topic.md#consumer-add). + +## Подключение к топику {#start-reader} + +Чтобы создать подключение к существующему топику `my-topic` через добавленного ранее читателя `my-consumer`, используйте следующий код: + +{% list tabs %} + +- Go + + ```go + reader, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("my-topic")) + if err != nil { + return err + } + ``` + +{% endlist %} + +Вы также можете использовать расширенный вариант создания подключения, чтобы указать несколько топиков и задать параметры чтения. Следующий код создаст подключение к топикам `my-topic` и `my-specific-topic` через читателя `my-consumer`, а также задаст время, с которого начинать читать сообщения: + +{% list tabs %} + +- Go + + ```go + reader, err := db.Topic().StartReader("my-consumer", []topicoptions.ReadSelector{ + { + Path: "my-topic", + }, + { + Path: "my-specific-topic", + ReadFrom: time.Date(2022, 7, 1, 10, 15, 0, 0, time.UTC), + }, + }, + ) + if err != nil { + return err + } + ``` + +{% endlist %} + +## Чтение сообщений {#reading-messages} + +Сервер хранит [позицию чтения сообщений](../../concepts/topic.md#consumer-offset). После вычитывания очередного сообщения клиент может [отправить на сервер подтверждение обработки](#commit). Позиция чтения изменится, а при новом подключении будут вычитаны только неподтвержденные сообщения. + +Читать сообщения можно и [без подтверждения обработки](#no-commit). В этом случае при новом подключении будут прочитаны все неподтвержденные сообщения, в том числе и уже обработанные. + +Информацию о том, какие сообщения уже обработаны, можно [сохранять на клиентской стороне](#client-commit), передавая на сервер стартовую позицию чтения при создании подключения. При этом позиция чтения сообщений на сервере не изменяется. + +SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами. + +### Чтение без подтверждения обработки сообщений {#no-commit} + +Чтобы читать сообщения по одному, используйте следующий код: + +{% list tabs %} + +- Go + + ```go + func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error { + for { + mess, err := r.ReadMessage(ctx) + if err != nil { + return err + } + processMessage(mess) + } + } + ``` + +{% endlist %} + +Чтобы прочитать пакет сообщений, используйте следующий код: + +{% list tabs %} + +- Go + + ```go + func SimpleReadBatches(ctx context.Context, r *topicreader.Reader) error { + for { + batch, err := r.ReadMessageBatch(ctx) + if err != nil { + return err + } + processBatch(batch) + } + } + ``` + +{% endlist %} + +### Чтение с подтверждением обработки сообщений {#commit} + +Чтобы подтверждать обработку сообщений по одному, используйте следующий код: + +{% list tabs %} + +- Go + + ```go + func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error { + for { + mess, err := r.ReadMessage(ctx) + if err != nil { + return err + } + processMessage(mess) + r.Commit(mess.Context(), mess) + } + } + ``` + +{% endlist %} + +Для подтверждения обработки пакета сообщений используйте следующий код: + +{% list tabs %} + +- Go + + ```go + func SimpleReadMessageBatch(ctx context.Context, r *topicreader.Reader) error { + for { + batch, err := r.ReadMessageBatch(ctx) + if err != nil { + return err + } + processBatch(batch) + r.Commit(batch.Context(), batch) + } + } + ``` + +{% endlist %} + +### Чтение с хранением позиции на клиентской стороне {#client-commit} + +При начале чтения клиентский код должен сообщить серверу стартовую позицию чтения: + +{% list tabs %} + +- Go + + ```go + func ReadWithExplicitPartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db ydb.Connection) error { + readContext, stopReader := context.WithCancel(context.Background()) + defer stopReader() + + readStartPosition := func( + ctx context.Context, + req topicoptions.GetPartitionStartOffsetRequest, + ) (res topicoptions.GetPartitionStartOffsetResponse, err error) { + offset, err := readLastOffsetFromDB(ctx, req.Topic, req.PartitionID) + res.StartFrom(offset) + + // Reader will stop if return err != nil + return res, err + } + + r, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("my-topic"), + topicoptions.WithGetPartitionStartOffset(readStartPosition), + ) + if err != nil { + return err + } + + go func() { + <-readContext.Done() + _ = r.Close(ctx) + }() + + for { + batch, err := r.ReadMessageBatch(readContext) + if err != nil { + return err + } + + processBatch(batch) + _ = externalSystemCommit(batch.Context(), batch.Topic(), batch.PartitionID(), batch.EndOffset()) + } + } + ``` + +{% endlist %} + +## Обработка серверного прерывания чтения {#stop} + +В {{ ydb-short-name }} используется серверная балансировка партиций между клиентами. Это означает, что сервер может прерывать чтение сообщений из произвольных партиций. + +При _мягком прерывании_ клиент получает уведомление, что сервер уже закончил отправку сообщений из партиции и больше сообщения читаться не будут. Клиент может завершить обработку сообщений и отправить подтверждение на сервер. + +В случае _жесткого прерывания_ клиент получает уведомление, что работать с сообщениями партиции больше нельзя. Клиент должен прекратить обработку прочитанных сообщений. Неподтвержденные сообщения будут переданы другому читателю. + +### Мягкое прерывание чтения {#soft-stop} + +{% list tabs %} + +- Go + + Клиентский код сразу получает все имеющиеся в буфере (на стороне SDK) сообщения, даже если их не достаточно для формирования пакета при групповой обработке. + + ```go + r, _ := db.Topic().StartReader("my-consumer", nil, + topicoptions.WithBatchReadMinCount(1000), + ) + + for { + batch, _ := r.ReadMessageBatch(ctx) // <- if partition soft stop batch can be less, then 1000 + processBatch(batch) + _ = r.Commit(batch.Context(), batch) + } + + ``` + +{% endlist %} + +### Жесткое прерывание чтения {#hard-stop} + +{% list tabs %} + +- Go + + При прерывании чтения контекст сообщения или пакета сообщений будет отменен. + + ```go + ctx := batch.Context() // batch.Context() will cancel if partition revoke by server or connection broke + if len(batch.Messages) == 0 { + return + } + + buf := &bytes.Buffer{} + for _, mess := range batch.Messages { + buf.Reset() + _, _ = buf.ReadFrom(mess) + _, _ = io.Copy(buf, mess) + writeMessagesToDB(ctx, buf.Bytes()) + } + ``` + +{% endlist %} diff --git a/ydb/docs/ru/core/reference/ydb-sdk/topic/topic.md b/ydb/docs/ru/core/reference/ydb-sdk/topic/topic.md deleted file mode 100644 index c43ed94c798..00000000000 --- a/ydb/docs/ru/core/reference/ydb-sdk/topic/topic.md +++ /dev/null @@ -1,299 +0,0 @@ -# Работа с топиками -## Основные понятия -### Сообщение -Минимальная, неделимая единица пользовательской информации. Состоит из тела сообщения, свойств сообщения и атрибутов сессии записи. - -### Тело сообщения -Произвольный набор байт, YDB никак не интерпретирует это содержимое. - -### Свойства сообщения -Типизированные поля сообщения, приходящие вне основного тела сообщения и имеющие предопределённый смысл. - -#### codec -Способ, которым закодировано сообщение, обычно указывается алгоритм сжатия, который был использован. SDK будет применять соответствующий алгоритм для расжатия сообщения перед тем как отдать его в клиентский код. - -#### created_at -Время создания сообщения, указывается отправителем, передаётся читателю "как есть", без проверки на стороне сервера - -#### message_group_id -Задаётся отправителем опционально, используется для разбиения сообщений по партициям. - -#### offset -Порядковый номер сообщения внутри партиции, присваивается сервером при сохранении сообщения. У первого сообщения партиции оффсет равен 0, дальше возрастает. В offset-ах могут быть пропуски. - -#### uncompressed_size -Размер расжатого сообщения, задаётся отправителем и передаётся читателю "как есть", без проверки на стороне сервера. - -#### seq_no -Порядковый номер сообщения внутри одного ProducerID. Задаётся писателем сообщения перед отправкой на сервер. -Должен идти по позрастанию внутри ProducerID. - -#### producer_id -ID, задаваемый отправителем. В пределах партиции для каждого producer_id гарантируется возрастание seq_no. - -#### written_at -Время записи сообщения на сервер, задаётся сервером при сохранении сообщения. - -#### write_session_meta -Набор строковых атрибутов ключ/значение, задаваемых отправителем при старте сессии записи. Атрибуты сессии будут одинаковые для всех сообщений, записанных внутри одной сессии. - -### Коммит сообщения -Подтверждение факта обработки сообщения читателем. Означает что читатель обработал сообщение и более в нём не нуждается. Коммиты сообщений независимы для разных consumer-ов. - -### Топик -Именованный набор сообщений. Чтение и запись сообщений ведётся через топики. - -### Партиция -Единица масштабирования топика. Партиции внутри топика пронумерованы, начиная с 0. В конечном итоге сообщения сохраняются в партиции. Сообщения внутри партиции упорядочены и пронумерованы. - -### Читатель -Именованная сущность для чтения данных из топика. Читатель содержит подтверждённые позиции чтения, сохраняемые на стороне сервера. - -### Важный читатель -Читатель, обладающий признаком "важный". Наличие этого признака означает что сообщение не будет удаляться из топика до подтверждения обработки важным читателем, даже если его уже пора удалять по правилам ротации. Долгий простой важного читателя может привести к полному исчерпанию места на диске. - -## Гарантии -В общем случае гарантируется доставка сообщений минимум один раз (least once). - -### Запись сообщений -1. После подтверждения записи сервером сообщение считается надёжно сохранённым и будет доставлено получателям -2. Сообщения с одинаковым message_group_id попадают в одну партицию. -3. message_group_id и партиция не заданы явно - сообщения с одинаковыми producer_id попадут в одну партицию. -4. При записи сообщения в партиции сохраняется их порядок внутри одного producer_id. -5. Если при записи сообщения в партицию seq_no оказывается меньше или равен seq_no ранее подтверждённого сообщения для того же producer_id - сообщение пропускается и не записывается. - -### Чтение сообщений -1. Из каждой партиции сообщения приходят упорядоченными по возрастанию offset -2. Из каждой партиции сообщения приходят с возрастающим seq_no в рамках одного producer_id -3. После подтверждение сервером коммита сообщения оно больше не будет отправляться этому consumer-у - -## Работа с топиками из SDK -### Подключение к топику -Для чтения сообщения из топика нужно подключиться к базе TODO:ССЫЛКА и подписаться на топик - -{% list tabs %} - -- Go - - ```go - reader, err := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd")) - if err != nil { - return err - } - ``` - -{% endlist %} - -При необходимости читать сообщения из нескольких топиков или задать более точные опции чтения можно использовать расширенный вариант создания читателя - -{% list tabs %} - -- Go - - ```go - reader, err := db.Topic().StartReader("consumer", []topicoptions.ReadSelector{ - { - Path: "test", - }, - { - Path: "test-2", - Partitions: []int64{1, 2, 3}, - ReadFrom: time.Date(2022, 7, 1, 10, 15, 0, 0, time.UTC), - }, - }, - ) - if err != nil { - return err - } - ``` - -{% endlist %} - - -### Чтение сообщений - -Порядок сообщений гарантируется сервером внутри одной партиции. Те же гарантии распространяются и на SDK: сообщения из одной партиции будет упорядочены между собой. При этом сообщения из разных партиций могут приходить не в том порядке как они были записаны на сервер и не в том порядке как сервер отдал их клиенту. При этом исходные гарантии об упорядоченности сообщений внутри партиции сохраняются на всём пути сообщения - от сохранения в партицию до передачи в клиентский код. - -Чтение сообщений по одному: - -{% list tabs %} - -- Go - - ```go - func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error { - for { - mess, err := r.ReadMessage(ctx) - if err != nil { - return err - } - processMessage(mess) - } - } - ``` - -{% endlist %} - - -При групповой обработке сообщений удобнее получать их пачками - в этом случае все сообщения внутри пачки будут из одной партиции. - -{% list tabs %} - -- Go - - ```go - func ReadBatchesWithBatchCommit(ctx context.Context, r *topicreader.Reader) error { - for { - batch, err := r.ReadMessageBatch(ctx) - if err != nil { - return err - } - processBatch(batch) - } - } - ``` - -{% endlist %} - -### Подтверждение обработки сообщений (commit) -Сервер может сохранять на своей стороне позицию обработанных сообщений - для этого нужно отправлять на сервер подтверждения обработки. Это опциональная возможность и часто она позволяет сделать код проще. - -Обработку сообщений можно подтверждать по одному: - -{% list tabs %} - -- Go - - ```go - func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error { - for { - ... - r.Commit(mess.Context(), mess) - } - } - ``` - -{% endlist %} - -и пачками - -{% list tabs %} - -- Go - - ```go - func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error { - for { - ... - r.Commit(batch.Context(), batch) - } - } - ``` - -{% endlist %} - -### Работа без подтверждения обработки -При необходимости читать сообщения без сохранения прогресса в топике - нужно сохранять его на своей стороне и обрабатывать служебные сообщения о начале чтения партиций - чтобы сообщать серверу с какого момента ему продолжать передачу. Без такой обработки сервер будет каждый раз отправлять все имеющиеся сообщения. - -{% list tabs %} - -- Go - - ```go - func ReadWithExplicitPartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db ydb.Connection) error { - readContext, stopReader := context.WithCancel(context.Background()) - defer stopReader() - - readStartPosition := func( - ctx context.Context, - req topicoptions.GetPartitionStartOffsetRequest, - ) (res topicoptions.GetPartitionStartOffsetResponse, err error) { - offset, err := readLastOffsetFromDB(ctx, req.Topic, req.PartitionID) - res.StartFrom(offset) - - // Reader will stop if return err != nil - return res, err - } - - r, err := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd"), - topicoptions.WithGetPartitionStartOffset(readStartPosition), - ) - if err != nil { - return err - } - - go func() { - <-readContext.Done() - _ = r.Close(ctx) - }() - - for { - batch, err := r.ReadMessageBatch(readContext) - if err != nil { - return err - } - - processBatch(batch) - _ = externalSystemCommit(batch.Context(), batch.Topic(), batch.PartitionID(), batch.EndOffset()) - } - } - ``` - -{% endlist %} - -### Отбор партиции -В YDB используется серверная балансировка партиций между подключившимися клиентами, поэтому сервер по своей инициативе может прекратить отправку сообщений клиенту из некоторых партиций. В этом случае клиент должен завершить обработку полученных сообщений. - -У сервера есть два способа забрать партицию: мягкий (с предварительным уведомлением) и жёсткий (сообщение что с партицией работать уже нельзя). - -Мягкий вариант уведомления означает что сервер уже закончил отправку сообщений из этой партиции и больше сообщений сюда отправлять не будет, при этом у клиента ещё есть время закончить обработку полученных сообщений. - -Обработка мягкого отбора партиции. -{% list tabs %} - -- Go - - В основном API SDK отдельного уведомления о мягком отборе партиции нет. - Внутри SDK обрабатывает сигнал таким образом, что сразу отдаёт пользователю оставшиеся в буфере сообщения даже если - по настройкам нужно собрать пачку побольше - - ```go - r, _ := db.Topic().StartReader("consumer", nil, - topicoptions.WithBatchReadMinCount(1000), - ) - - for { - batch, _ := r.ReadMessageBatch(ctx) // <- if partition soft stop batch can be less, then 1000 - processBatch(batch) - _ = r.Commit(batch.Context(), batch) - } - - ``` -{% endlist %} - -Жесткий вариант означает что клиент должен прекратить обработку полученных сообщений, т.к. все неподтверждённые собщения будут переданы другому читателю. - -Обработка жёсткого отбора партиции. -{% list tabs %} - -- Go - - У каждого сообщения (и на пачке если сообщения читаются пачками) есть контекст сообщения. Если партиция отобрана - у пачек и сообщений - из этой партиции контекст будет отменён. - - ```go - ctx := batch.Context() // batch.Context() will cancel if partition revoke by server or connection broke - if len(batch.Messages) == 0 { - return - } - - buf := &bytes.Buffer{} - for _, mess := range batch.Messages { - buf.Reset() - _, _ = buf.ReadFrom(mess) - _, _ = io.Copy(buf, mess) - writeMessagesToDB(ctx, buf.Bytes()) - } - ``` - -{% endlist %} diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index a6d5dff85d8..6057f723c4d 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -1777,6 +1777,16 @@ "Match": {"Type": "Callable", "Name": "TableRecord"} }, { + "Name": "TCoWriteTime", + "Base": "TCoTablePropBase", + "Match": {"Type": "Callable", "Name": "WriteTime"} + }, + { + "Name": "TCoOffset", + "Base": "TCoTablePropBase", + "Match": {"Type": "Callable", "Name": "Offset"} + }, + { "Name": "TCoIsKeySwitch", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "IsKeySwitch"}, diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 39fa6a61781..9100865c6fc 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -265,11 +265,13 @@ private: void PeerFinished(ui64 channelId) override { // no TaskRunner => no outputChannel.Channel, nothing to Finish + CA_LOG_I("Peer finished, channel: " << channelId); TOutputChannelInfo* outputChannel = OutputChannelsMap.FindPtr(channelId); YQL_ENSURE(outputChannel, "task: " << Task.GetId() << ", output channelId: " << channelId); outputChannel->Finished = true; - Send(TaskRunnerActorId, MakeHolder<NTaskRunnerActor::TEvPush>(channelId, /* finish = */ true, /* askFreeSpace = */ false, /* pauseAfterPush = */ false, /* isOut = */ true), 0, Cookie); // finish channel + ProcessOutputsState.Inflight++; + Send(TaskRunnerActorId, MakeHolder<NTaskRunnerActor::TEvPop>(channelId, /* wasFinished = */ true, 0)); // finish channel DoExecute(); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index c056b6325e6..30da39c2d49 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -365,6 +365,7 @@ protected: void ProcessOutputsImpl(ERunStatus status) { ProcessOutputsState.LastRunStatus = status; + CA_LOG_D("ProcessOutputsState.Inflight: " << ProcessOutputsState.Inflight ); if (ProcessOutputsState.Inflight == 0) { ProcessOutputsState = TProcessOutputsState(); } @@ -1500,10 +1501,12 @@ protected: return; } + CA_LOG_T("Poll sources"); for (auto& [inputIndex, source] : SourcesMap) { PollAsyncInput(source, inputIndex); } + CA_LOG_T("Poll inputs"); for (auto& [inputIndex, transform] : InputTransformsMap) { PollAsyncInput(transform, inputIndex); } diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 7eb8e67d678..c8ac3afd65d 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -262,6 +262,10 @@ private: auto channelId = ev->Get()->ChannelId; auto channel = TaskRunner->GetOutputChannel(channelId); + if (ev->Get()->WasFinished) { + channel->Finish(); + LOG_I("output channel with id [" << channelId << "] finished prematurely"); + } int maxChunks = std::numeric_limits<int>::max(); auto wasFinished = ev->Get()->WasFinished; bool changed = false; diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 77640baf044..d90ad856f1d 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -12,23 +12,29 @@ using namespace NYql::NNodes; namespace { -TVector<TCoArgument> PrepareArgumentsReplacement(const TCoArguments& args, const TVector<TDqConnection>& newInputs, +TVector<TCoArgument> PrepareArgumentsReplacement(const TExprBase& node, const TVector<TDqConnection>& newInputs, TExprContext& ctx, TNodeOnNodeOwnedMap& replaceMap) { TVector<TCoArgument> newArgs; - newArgs.reserve(args.Size() + newInputs.size()); replaceMap.clear(); - for (size_t i = 0; i < args.Size(); ++i) { - TCoArgument newArg{ctx.NewArgument(args.Pos(), TStringBuilder() - << "_dq_replace_arg_" << i)}; - replaceMap[args.Arg(i).Raw()] = newArg.Ptr(); - newArgs.emplace_back(newArg); + if (auto maybeArgs = node.Maybe<TCoArguments>()) { + auto args = maybeArgs.Cast(); + + newArgs.reserve(args.Size() + newInputs.size()); + for (size_t i = 0; i < args.Size(); ++i) { + TCoArgument newArg{ctx.NewArgument(node.Pos(), TStringBuilder() + << "_dq_replace_arg_" << i)}; + replaceMap[args.Arg(i).Raw()] = newArg.Ptr(); + newArgs.emplace_back(newArg); + } + } else { + newArgs.reserve(newInputs.size()); } for (size_t i = 0; i < newInputs.size(); ++i) { - TCoArgument newArg{ctx.NewArgument(args.Pos(), TStringBuilder() - << "_dq_replace_input_arg_" << args.Size() + i)}; + TCoArgument newArg{ctx.NewArgument(node.Pos(), TStringBuilder() + << "_dq_replace_input_arg_" << newArgs.size())}; replaceMap[newInputs[i].Raw()] = newArg.Ptr(); newArgs.emplace_back(newArg); } @@ -196,6 +202,101 @@ TExprBase DqPushMembersFilterToStage(TExprBase node, TExprContext& ctx, IOptimiz return result.Cast(); } +TMaybeNode<TDqStage> DqPushFlatMapInnerConnectionsToStageInput(TCoFlatMapBase& flatmap, + TExprNode::TListType&& innerConnections, TExprContext& ctx) +{ + TVector<TDqConnection> inputs; + TNodeOnNodeOwnedMap replaceMap; + + // prepare inputs (inner connections + flatmap input) + inputs.reserve(innerConnections.size() + 1); + inputs.push_back(flatmap.Input().Cast<TDqConnection>()); + for (auto& cn : innerConnections) { + if (!TMaybeNode<TDqCnUnionAll>(cn).IsValid() && !TMaybeNode<TDqCnMerge>(cn).IsValid()) { + return {}; + } + + inputs.push_back(TDqConnection(cn)); + } + + auto args = PrepareArgumentsReplacement(flatmap.Input(), inputs, ctx, replaceMap); + auto newFlatMap = ctx.ReplaceNodes(flatmap.Ptr(), replaceMap); + + auto buildDqBroadcastCn = [&ctx](auto& cn) { + auto collectStage = Build<TDqStage>(ctx, cn.Pos()) + .Inputs() + .Add(cn) + .Build() + .Program() + .Args({"stream"}) + .Body("stream") + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, cn.Pos())) + .Done(); + + return Build<TDqCnBroadcast>(ctx, cn.Pos()) + .Output() + .Stage(collectStage) + .Index().Build("0") + .Build() + .Done(); + }; + + TVector<TExprBase> stageInputs; + stageInputs.reserve(inputs.size()); + auto mapCn = Build<TDqCnMap>(ctx, flatmap.Input().Pos()) + .Output(inputs[0].Output()) + .Done(); + stageInputs.emplace_back(std::move(mapCn)); + + // gather all elements from stream inputs (skip flatmap input) + for (ui32 inputId = 1; inputId < inputs.size(); ++inputId) { + auto argAsList = ctx.NewArgument(inputs[inputId].Pos(), TStringBuilder() << "_dq_list_arg" << inputId); + auto stageInput = buildDqBroadcastCn(inputs[inputId]); + + auto condenseInput = Build<TCoCondense>(ctx, stageInput.Pos()) + .Input(args[inputId]) + .State<TCoList>() + .ListType<TCoTypeOf>() + .Value(stageInput) + .Build() + .Build() + .SwitchHandler() + .Args({"item", "state"}) + .Body(MakeBool<false>(stageInput.Pos(), ctx)) + .Build() + .UpdateHandler() + .Args({"item", "state"}) + .Body<TCoAppend>() + .List("state") + .Item("item") + .Build() + .Build() + .Done().Ptr(); + + newFlatMap = Build<TCoFlatMap>(ctx, stageInput.Pos()) + .Input(condenseInput) + .Lambda() + .Args({argAsList}) + .Body(ctx.ReplaceNode(std::move(newFlatMap), args[inputId].Ref(), argAsList)) + .Build() + .Done().Ptr(); + + stageInputs.emplace_back(std::move(stageInput)); + } + + return Build<TDqStage>(ctx, flatmap.Pos()) + .Inputs() + .Add(std::move(stageInputs)) + .Build() + .Program() + .Args(args) + .Body(newFlatMap) + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, flatmap.Pos())) + .Done(); +} + } // namespace TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& outputIndex, TCoLambda& lambda, @@ -350,41 +451,56 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo return node; } - if (!IsDqPureExpr(flatmap.Lambda())) { - return node; - } + auto filter = [](const TExprNode::TPtr& node) { + return !TMaybeNode<TDqPhyPrecompute>(node).IsValid(); + }; - if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) { - return TExprBase(ctx.ChangeChild(*node.Raw(), TCoFlatMapBase::idx_Input, std::move(connToPushableStage))); - } + auto predicate = [](const TExprNode::TPtr& node) { + return TMaybeNode<TDqSource>(node).IsValid() || + TMaybeNode<TDqConnection>(node).IsValid(); + }; - auto lambda = TCoLambda(ctx.Builder(flatmap.Lambda().Pos()) - .Lambda() - .Param("stream") - .Callable(flatmap.Ref().Content()) - .Arg(0, "stream") - .Add(1, ctx.DeepCopyLambda(flatmap.Lambda().Ref())) - .Seal() - .Seal().Build()); + auto innerConnections = FindNodes(flatmap.Lambda().Body().Ptr(), filter, predicate); - auto pushResult = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx); - if (pushResult) { - return pushResult.Cast(); - } + TMaybeNode<TDqStage> flatmapStage; + if (!innerConnections.empty()) { + flatmapStage = DqPushFlatMapInnerConnectionsToStageInput(flatmap, std::move(innerConnections), ctx); + if (!flatmapStage) { + return node; + } + } else { + if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) { + return TExprBase(ctx.ChangeChild(*node.Raw(), TCoFlatMapBase::idx_Input, std::move(connToPushableStage))); + } - auto flatmapStage = Build<TDqStage>(ctx, flatmap.Pos()) - .Inputs() - .Add<TDqCnMap>() - .Output(dqUnion.Output()) + auto lambda = TCoLambda(ctx.Builder(flatmap.Lambda().Pos()) + .Lambda() + .Param("stream") + .Callable(flatmap.Ref().Content()) + .Arg(0, "stream") + .Add(1, ctx.DeepCopyLambda(flatmap.Lambda().Ref())) + .Seal() + .Seal().Build()); + + auto pushResult = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx); + if (pushResult) { + return pushResult.Cast(); + } + + flatmapStage = Build<TDqStage>(ctx, flatmap.Pos()) + .Inputs() + .Add<TDqCnMap>() + .Output(dqUnion.Output()) + .Build() .Build() - .Build() - .Program(lambda) - .Settings(TDqStageSettings().BuildNode(ctx, flatmap.Pos())) - .Done(); + .Program(lambda) + .Settings(TDqStageSettings().BuildNode(ctx, flatmap.Pos())) + .Done(); + } return Build<TDqCnUnionAll>(ctx, node.Pos()) .Output() - .Stage(flatmapStage) + .Stage(flatmapStage.Cast()) .Index().Build("0") .Build() .Done(); diff --git a/ydb/library/yql/providers/common/mkql/parser.cpp b/ydb/library/yql/providers/common/mkql/parser.cpp index d2ede218e67..4de60bd7025 100644 --- a/ydb/library/yql/providers/common/mkql/parser.cpp +++ b/ydb/library/yql/providers/common/mkql/parser.cpp @@ -101,35 +101,70 @@ TString ResolveUDFNameByCompression(std::string_view input) { } THROW yexception() << "Invalid compression: " << input; } + +TRuntimeNode WrapWithDecompress( + TRuntimeNode input, + const TType* inputItemType, + const std::string_view& compression, + NCommon::TMkqlBuildContext& ctx) +{ + // If input has one field, this field is data + if (!inputItemType->IsTuple()) { + return ctx.ProgramBuilder.Map(input, [&ctx, &compression](TRuntimeNode item) { + return ctx.ProgramBuilder.Apply( + ctx.ProgramBuilder.Udf(std::string("Decompress.") += ResolveUDFNameByCompression(compression)), + {item}); + }); + } + + // If input has multiple fields, decompress only "data" field. + const auto* inputItemTuple = static_cast<const TTupleType*>(inputItemType); + return ctx.ProgramBuilder.Map(input, [&](TRuntimeNode item) { + const auto dataMember = ctx.ProgramBuilder.Nth(item, 0); + const auto decompress = ctx.ProgramBuilder.Apply( + ctx.ProgramBuilder.Udf(std::string("Decompress.") += ResolveUDFNameByCompression(compression)), + {dataMember}); + + std::vector<TRuntimeNode> res; + res.emplace_back(decompress); + for (auto i = 1U; i < inputItemTuple->GetElementsCount(); i++) { + res.emplace_back(ctx.ProgramBuilder.Nth(item, i)); + } + + return ctx.ProgramBuilder.NewTuple(res); + }); +} } // namespace TRuntimeNode BuildParseCall( TPosition pos, TRuntimeNode input, TMaybe<TRuntimeNode> extraColumnsByPathIndex, + std::unordered_map<TString, ui32>&& metadataColumns, const std::string_view& format, const std::string_view& compression, TType* inputType, - TType* outputItemType, + TType* parseItemType, TType* finalItemType, NCommon::TMkqlBuildContext& ctx) { + const auto* inputItemType = static_cast<TStreamType*>(inputType)->GetItemType(); + const auto* parseItemStructType = static_cast<TStructType*>(parseItemType); + const auto* finalItemStructType = static_cast<TStructType*>(finalItemType); + if (!compression.empty()) { - input = ctx.ProgramBuilder.Map(input, [&ctx, &compression](TRuntimeNode item) { - return ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf(std::string("Decompress.") += ResolveUDFNameByCompression(compression)), {item}); - }); + input = WrapWithDecompress(input, inputItemType, compression, ctx); } - const auto structType = static_cast<const TStructType*>(outputItemType); if (format == "raw") { - MKQL_ENSURE(1U >= structType->GetMembersCount(), "Expected at most one column."); auto parseLambda = [&](TRuntimeNode item) { - if (structType->GetMembersCount() == 0) { - return ctx.ProgramBuilder.NewStruct(outputItemType, {}); + if (parseItemStructType->GetMembersCount() == 0) { + return ctx.ProgramBuilder.NewStruct(parseItemType, {}); } bool isOptional; - const auto schemeType = UnpackOptionalData(structType->GetMemberType(0U), isOptional)->GetSchemeType(); + const auto schemeType = UnpackOptionalData( + parseItemStructType->GetMemberType(0U), isOptional)->GetSchemeType(); TRuntimeNode converted; if (NUdf::TDataType<const char*>::Id == schemeType) { @@ -140,37 +175,62 @@ TRuntimeNode BuildParseCall( ctx.ProgramBuilder.StrictFromString(item, type); } - return ctx.ProgramBuilder.NewStruct(outputItemType, {{structType->GetMemberName(0), converted }}); + return ctx.ProgramBuilder.NewStruct(parseItemType, {{parseItemStructType->GetMemberName(0), converted }}); }; input = ctx.ProgramBuilder.Map(ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode item) { - if (extraColumnsByPathIndex) { + std::vector<TRuntimeNode> res; + + if (extraColumnsByPathIndex || !metadataColumns.empty()) { auto data = ctx.ProgramBuilder.Nth(item, 0); - auto pathInd = ctx.ProgramBuilder.Nth(item, 1); - return ctx.ProgramBuilder.NewTuple({ parseLambda(data), pathInd }); + TMaybe<TRuntimeNode> pathInd; + res.emplace_back(parseLambda(data)); + if (extraColumnsByPathIndex) { + res.emplace_back(ctx.ProgramBuilder.Nth(item, res.size())); + } + for (auto i = 0U; i < metadataColumns.size(); i++) { + res.emplace_back(ctx.ProgramBuilder.Nth(item, res.size())); + } + return ctx.ProgramBuilder.NewTuple(res); } return parseLambda(item); } ); } else if (format == "json_list") { auto parseToListLambda = [&](TRuntimeNode blob) { - const auto json = ctx.ProgramBuilder.StrictFromString(blob, ctx.ProgramBuilder.NewDataType(NUdf::TDataType<NUdf::TJson>::Id)); - const auto dom = ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("Yson2.ParseJson"), {json}); - const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({dom.GetStaticType()}), ctx.ProgramBuilder.NewStructType({}), ctx.ProgramBuilder.NewListType(outputItemType)}); - return ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("Yson2.ConvertTo", {}, userType), {dom}); + const auto json = ctx.ProgramBuilder.StrictFromString( + blob, + ctx.ProgramBuilder.NewDataType(NUdf::TDataType<NUdf::TJson>::Id)); + const auto dom = ctx.ProgramBuilder.Apply( + ctx.ProgramBuilder.Udf("Yson2.ParseJson"), + {json}); + const auto userType = ctx.ProgramBuilder.NewTupleType({ + ctx.ProgramBuilder.NewTupleType({dom.GetStaticType()}), + ctx.ProgramBuilder.NewStructType({}), + ctx.ProgramBuilder.NewListType(parseItemType)}); + return ctx.ProgramBuilder.Apply( + ctx.ProgramBuilder.Udf("Yson2.ConvertTo", {}, userType), + {dom}); }; input = ctx.ProgramBuilder.FlatMap(ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode blob) { TRuntimeNode parsedList; - if (extraColumnsByPathIndex) { + if (extraColumnsByPathIndex || !metadataColumns.empty()) { auto data = ctx.ProgramBuilder.Nth(blob, 0); - auto pathInd = ctx.ProgramBuilder.Nth(blob, 1); parsedList = ctx.ProgramBuilder.Map(parseToListLambda(data), [&](TRuntimeNode item) { - return ctx.ProgramBuilder.NewTuple({ item, pathInd }); + std::vector<TRuntimeNode> res; + res.emplace_back(item); + if (extraColumnsByPathIndex) { + res.emplace_back(ctx.ProgramBuilder.Nth(blob, res.size())); + } + for (auto i = 0U; i < metadataColumns.size(); i++) { + res.emplace_back(ctx.ProgramBuilder.Nth(blob, res.size())); + } + return ctx.ProgramBuilder.NewTuple(res); } ); } else { @@ -179,51 +239,66 @@ TRuntimeNode BuildParseCall( return parsedList; }); } else { - TType* userOutputType = outputItemType; - TType* inputDataType = static_cast<TStreamType*>(inputType)->GetItemType(); - if (extraColumnsByPathIndex) { - userOutputType = ctx.ProgramBuilder.NewTupleType({ userOutputType, ctx.ProgramBuilder.NewDataType(NUdf::EDataSlot::Uint64)}); - inputDataType = static_cast<TTupleType*>(inputDataType)->GetElementType(0); + TType* userOutputType = parseItemType; + const TType* inputDataType = inputItemType; + + if (extraColumnsByPathIndex || !metadataColumns.empty()) { + const auto* inputItemTuple = static_cast<const TTupleType*>(inputItemType); + + std::vector<TType*> tupleItems; + tupleItems.reserve(inputItemTuple->GetElementsCount()); + + tupleItems.emplace_back(userOutputType); + for (auto i = 1U; i < inputItemTuple->GetElementsCount(); i++) { + tupleItems.emplace_back(inputItemTuple->GetElementType(i)); + } + + userOutputType = ctx.ProgramBuilder.NewTupleType(tupleItems); + inputDataType = inputItemTuple->GetElementType(0); } - const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({inputType}), ctx.ProgramBuilder.NewStructType({}), userOutputType}); + + const auto userType = ctx.ProgramBuilder.NewTupleType({ + ctx.ProgramBuilder.NewTupleType({inputType}), + ctx.ProgramBuilder.NewStructType({}), + userOutputType}); input = TType::EKind::Resource == inputDataType->GetKind() ? ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseBlocks", {}, userType), {input})): ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseFormat", {}, userType, format), {input})); } - const auto finalStructType = static_cast<const TStructType*>(finalItemType); - if (extraColumnsByPathIndex) { - return ctx.ProgramBuilder.ExpandMap(input, - [&](TRuntimeNode item) { - // find extra columns by path index and combine them with parsed output - auto data = ctx.ProgramBuilder.Nth(item, 0); - auto pathInd = ctx.ProgramBuilder.Nth(item, 1); + return ctx.ProgramBuilder.ExpandMap(input, + [&](TRuntimeNode item) { + auto parsedData = (extraColumnsByPathIndex || !metadataColumns.empty()) + ? ctx.ProgramBuilder.Nth(item, 0) + : item; - auto extra = ctx.ProgramBuilder.Lookup(ctx.ProgramBuilder.ToIndexDict(*extraColumnsByPathIndex), pathInd); - extra = ctx.ProgramBuilder.Unwrap(extra, + TMaybe<TRuntimeNode> extra; + if (extraColumnsByPathIndex) { + auto pathInd = ctx.ProgramBuilder.Nth(item, 1); + auto extraNode = ctx.ProgramBuilder.Lookup(ctx.ProgramBuilder.ToIndexDict(*extraColumnsByPathIndex), pathInd); + extra = ctx.ProgramBuilder.Unwrap(extraNode, ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("Failed to lookup path index"), pos.File, pos.Row, pos.Column); - - TRuntimeNode::TList fields; - fields.reserve(finalStructType->GetMembersCount()); - for (ui32 i = 0; i < finalStructType->GetMembersCount(); ++i) { - TStringBuf name = finalStructType->GetMemberName(i); - const bool inData = structType->FindMemberIndex(name).Defined(); - fields.push_back(ctx.ProgramBuilder.Member(inData ? data : extra, name)); - } - return fields; } - ); - } - return ctx.ProgramBuilder.ExpandMap(input, - [&](TRuntimeNode item) { TRuntimeNode::TList fields; - fields.reserve(finalStructType->GetMembersCount()); - auto j = 0U; - std::generate_n(std::back_inserter(fields), finalStructType->GetMembersCount(), [&](){ return ctx.ProgramBuilder.Member(item, finalStructType->GetMemberName(j++)); }); + fields.reserve(finalItemStructType->GetMembersCount()); + + for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) { + TStringBuf name = finalItemStructType->GetMemberName(i); + const auto metadataIter = metadataColumns.find(TString(name)); + if (metadataIter != metadataColumns.end()) { + fields.push_back(ctx.ProgramBuilder.Nth(item, metadataIter->second)); + } else if (parseItemStructType->FindMemberIndex(name).Defined()) { + fields.push_back(ctx.ProgramBuilder.Member(parsedData, name)); + } else { + MKQL_ENSURE(extra, "Column " << name << " wasn't found"); + fields.push_back(ctx.ProgramBuilder.Member(*extra, name)); + } + } return fields; - }); + } + ); } TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWideWrap& wrapper, NCommon::TMkqlBuildContext& ctx) { @@ -233,25 +308,57 @@ TMaybe<TRuntimeNode> TryWrapWithParser(const TDqSourceWideWrap& wrapper, NCommon } const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx); - const auto inputItemType = NCommon::BuildType(wrapper.Input().Ref(), *wrapper.Input().Ref().GetTypeAnn(), ctx.ProgramBuilder); + const auto inputType = NCommon::BuildType( + wrapper.Input().Ref(), + *wrapper.Input().Ref().GetTypeAnn(), + ctx.ProgramBuilder); const TStructExprType* rowType = wrapper.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); const TStructExprType* parsedType = rowType; TMaybe<TRuntimeNode> extraColumns; + const TStructExprType* extraType = nullptr; if (auto extraColumnsSetting = GetSetting(wrapper.Settings().Cast().Ref(), "extraColumns")) { extraColumns = MkqlBuildExpr(extraColumnsSetting->Tail(), ctx); - const TStructExprType* extraType = extraColumnsSetting->Tail().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); - auto parsedItems = rowType->GetItems(); - EraseIf(parsedItems, [extraType](const auto& item) { return extraType->FindItem(item->GetName()); }); - parsedType = ctx.ExprCtx.MakeType<TStructExprType>(parsedItems); + extraType = extraColumnsSetting->Tail().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); } - const auto outputItemType = NCommon::BuildType(wrapper.RowType().Ref(), *parsedType, ctx.ProgramBuilder); - const auto finalItemType = NCommon::BuildType(wrapper.RowType().Ref(), *rowType, ctx.ProgramBuilder); + std::unordered_map<TString, ui32> metadataColumns; + if (auto metadataSetting = GetSetting(wrapper.Settings().Cast().Ref(), "metadataColumns")) { + for (auto i = 0U; i < metadataSetting->Tail().ChildrenSize(); i++) { + const auto name = TCoAtom(metadataSetting->Tail().Child(i)).StringValue(); + metadataColumns.emplace(name, i + (extraColumns ? 2 : 1)); + } + } + + auto parsedItems = rowType->GetItems(); + EraseIf(parsedItems, [extraType, &metadataColumns](const auto& item) { + return extraType && extraType->FindItem(item->GetName()) || metadataColumns.contains(TString(item->GetName())); + }); + parsedType = ctx.ExprCtx.MakeType<TStructExprType>(parsedItems); + + const auto parseItemType = NCommon::BuildType( + wrapper.RowType().Ref(), + *parsedType, + ctx.ProgramBuilder); + const auto finalItemType = NCommon::BuildType( + wrapper.RowType().Ref(), + *rowType, + ctx.ProgramBuilder); + const auto& settings = GetSettings(wrapper.Settings().Cast().Ref()); TPosition pos = ctx.ExprCtx.GetPosition(wrapper.Pos()); - return BuildParseCall(pos, input, extraColumns, format.Content() + settings.front(), settings.back(), inputItemType, outputItemType, finalItemType, ctx); + return BuildParseCall( + pos, + input, + extraColumns, + std::move(metadataColumns), + format.Content() + settings.front(), + settings.back(), + inputType, + parseItemType, + finalItemType, + ctx); } } diff --git a/ydb/library/yql/providers/common/mkql/parser.h b/ydb/library/yql/providers/common/mkql/parser.h index 83f52328fca..5ddaaa31417 100644 --- a/ydb/library/yql/providers/common/mkql/parser.h +++ b/ydb/library/yql/providers/common/mkql/parser.h @@ -17,10 +17,11 @@ NKikimr::NMiniKQL::TRuntimeNode BuildParseCall( TPosition pos, NKikimr::NMiniKQL::TRuntimeNode input, TMaybe<NKikimr::NMiniKQL::TRuntimeNode> extraColumnsByPathIndex, + std::unordered_map<TString, ui32>&& metadataColumns, const std::string_view& format, const std::string_view& compression, - NKikimr::NMiniKQL::TType* inputItemType, - NKikimr::NMiniKQL::TType* outputItemType, + NKikimr::NMiniKQL::TType* inputType, + NKikimr::NMiniKQL::TType* parseItemType, NKikimr::NMiniKQL::TType* finalItemType, NCommon::TMkqlBuildContext& ctx); diff --git a/ydb/library/yql/providers/pq/async_io/CMakeLists.txt b/ydb/library/yql/providers/pq/async_io/CMakeLists.txt index f4a0cf6b122..0ea9fd0141a 100644 --- a/ydb/library/yql/providers/pq/async_io/CMakeLists.txt +++ b/ydb/library/yql/providers/pq/async_io/CMakeLists.txt @@ -25,6 +25,7 @@ target_link_libraries(providers-pq-async_io PUBLIC providers-pq-proto ) target_sources(providers-pq-async_io PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/pq/async_io/probes.cpp diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp new file mode 100644 index 00000000000..2b4c8dcb5a8 --- /dev/null +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp @@ -0,0 +1,67 @@ +#include "dq_pq_meta_extractor.h" + +#include <optional> + +#include <ydb/library/yql/minikql/mkql_string_util.h> +#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h> +#include <ydb/library/yql/public/udf/udf_data_type.h> +#include <ydb/library/yql/public/udf/udf_value.h> + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> + +#include <util/generic/string.h> + +namespace { + const std::unordered_map<TString, NYql::NDq::TPqMetaExtractor::TPqMetaExtractorLambda> ExtractorsMap = { + { + "_yql_sys_create_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ + return NYql::NUdf::TUnboxedValuePod(static_cast<NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>::TLayout>(message.GetCreateTime().MicroSeconds())); + } + }, + { + "_yql_sys_write_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ + return NYql::NUdf::TUnboxedValuePod(static_cast<NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>::TLayout>(message.GetWriteTime().MicroSeconds())); + } + }, + { + "_yql_sys_partition_id", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ + return NYql::NUdf::TUnboxedValuePod(message.GetPartitionStream()->GetPartitionId()); + } + }, + { + "_yql_sys_offset", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ + return NYql::NUdf::TUnboxedValuePod(message.GetOffset()); + } + }, + { + "_yql_sys_message_group_id", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ + const auto& data = message.GetMessageGroupId(); + return NKikimr::NMiniKQL::MakeString(NYql::NUdf::TStringRef(data.Data(), data.Size())); + } + }, + { + "_yql_sys_seq_no", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ + return NYql::NUdf::TUnboxedValuePod(static_cast<NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>::TLayout>(message.GetSeqNo())); + } + }, + }; +} + +namespace NYql::NDq { + +TPqMetaExtractor::TPqMetaExtractor() { + for (auto key : AllowedPqMetaSysColumns()) { + Y_ENSURE( + ExtractorsMap.contains(key), + "Pq metadata field " << key << " hasn't valid runtime extractor. You should add it."); + } +} + +TPqMetaExtractor::TPqMetaExtractorLambda TPqMetaExtractor::FindExtractorLambda(TString sysColumn) const { + auto iter = ExtractorsMap.find(sysColumn); + Y_ENSURE(iter != ExtractorsMap.end(), sysColumn); + + return iter->second; +} + +} diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h new file mode 100644 index 00000000000..df932bc48f2 --- /dev/null +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h @@ -0,0 +1,21 @@ +#pragma once + +#include "ydb/library/yql/minikql/mkql_string_util.h" +#include <optional> + +#include <ydb/library/yql/public/udf/udf_data_type.h> +#include <ydb/library/yql/public/udf/udf_value.h> + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> + +#include <util/generic/string.h> + +namespace NYql::NDq { + struct TPqMetaExtractor { + using TPqMetaExtractorLambda = std::function<NYql::NUdf::TUnboxedValuePod(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage&)>; + + public: + TPqMetaExtractor(); + TPqMetaExtractorLambda FindExtractorLambda(TString sysColumn) const; + }; +} diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index 7d7f29f750b..2c2605d3264 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -7,11 +7,13 @@ #include <ydb/library/yql/dq/common/dq_common.h> #include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h> -#include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h> #include <ydb/library/yql/minikql/mkql_alloc.h> #include <ydb/library/yql/minikql/mkql_string_util.h> +#include <ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h> +#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h> #include <ydb/library/yql/providers/pq/proto/dq_io_state.pb.h> +#include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/utils/yql_panic.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> @@ -114,7 +116,11 @@ public: , StartingMessageTimestamp(TInstant::MilliSeconds(TInstant::Now().MilliSeconds())) // this field is serialized as milliseconds, so drop microseconds part to be consistent with storage , ComputeActorId(computeActorId) { - Y_UNUSED(HolderFactory); + MetadataFields.reserve(SourceParams.MetadataFieldsSize()); + TPqMetaExtractor fieldsExtractor; + for (const auto& fieldName : SourceParams.GetMetadataFields()) { + MetadataFields.emplace_back(fieldName, fieldsExtractor.FindExtractorLambda(fieldName)); + } } NYdb::NPersQueue::TPersQueueClientSettings GetPersQueueClientSettings() const { @@ -316,7 +322,20 @@ private: continue; } - Batch.emplace_back(NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size()))); + NUdf::TUnboxedValuePod item; + if (Self.MetadataFields.empty()) { + item = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size())); + } else { + NUdf::TUnboxedValue* itemPtr; + item = Self.HolderFactory.CreateDirectArrayHolder(Self.MetadataFields.size() + 1, itemPtr); + *(itemPtr++) = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size())); + + for (const auto& [name, extractor] : Self.MetadataFields) { + *(itemPtr++) = extractor(message); + } + } + + Batch.emplace_back(item); UsedSpace += data.Size(); } Self.UpdateStateWithNewReadData(event); @@ -373,6 +392,7 @@ private: std::queue<std::pair<ui64, NYdb::NPersQueue::TDeferredCommit>> DeferredCommits; NYdb::NPersQueue::TDeferredCommit CurrentDeferredCommit; bool SubscribedOnEvent = false; + std::vector<std::tuple<TString, TPqMetaExtractor::TPqMetaExtractorLambda>> MetadataFields; }; std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor( diff --git a/ydb/library/yql/providers/pq/common/CMakeLists.txt b/ydb/library/yql/providers/pq/common/CMakeLists.txt index 8abba4751ea..7d4c0b74f09 100644 --- a/ydb/library/yql/providers/pq/common/CMakeLists.txt +++ b/ydb/library/yql/providers/pq/common/CMakeLists.txt @@ -14,7 +14,9 @@ target_compile_options(providers-pq-common PRIVATE target_link_libraries(providers-pq-common PUBLIC contrib-libs-cxxsupp yutil + yql-public-types ) target_sources(providers-pq-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/pq/common/yql_names.cpp ) diff --git a/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp b/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp new file mode 100644 index 00000000000..c0f613f593d --- /dev/null +++ b/ydb/library/yql/providers/pq/common/pq_meta_fields.cpp @@ -0,0 +1,65 @@ +#include "pq_meta_fields.h" +#include <ydb/library/yql/minikql/mkql_string_util.h> + +#include <unordered_map> + +namespace { + const std::vector<NYql::TMetaFieldDescriptor> PqMetaFields = { + NYql::TMetaFieldDescriptor("CreateTime", "_yql_sys_create_time", NYql::NUdf::EDataSlot::Timestamp), + NYql::TMetaFieldDescriptor("WriteTime", "_yql_sys_write_time", NYql::NUdf::EDataSlot::Timestamp), + NYql::TMetaFieldDescriptor("PartitionId", "_yql_sys_partition_id", NYql::NUdf::EDataSlot::Uint64), + NYql::TMetaFieldDescriptor("Offset", "_yql_sys_offset", NYql::NUdf::EDataSlot::Uint64), + NYql::TMetaFieldDescriptor("MessageGroupId", "_yql_sys_message_group_id", NYql::NUdf::EDataSlot::String), + NYql::TMetaFieldDescriptor("SeqNo", "_yql_sys_seq_no", NYql::NUdf::EDataSlot::Uint64), + }; +} + +namespace NYql { + +const TMetaFieldDescriptor* FindPqMetaFieldDescriptorByCallable(const TString& callableName) { + const auto iter = std::find_if( + PqMetaFields.begin(), + PqMetaFields.end(), + [&](const NYql::TMetaFieldDescriptor& item){ return item.CallableName == callableName; }); + if (iter != PqMetaFields.end()) { + return iter; + } + + return nullptr; +} + +const TMetaFieldDescriptor* FindPqMetaFieldDescriptorBySysColumn(const TString& sysColumn) { + const auto iter = std::find_if( + PqMetaFields.begin(), + PqMetaFields.end(), + [&](const NYql::TMetaFieldDescriptor& item){ return item.SysColumn == sysColumn; }); + if (iter != PqMetaFields.end()) { + return iter; + } + + return nullptr; +} + +std::vector<TString> AllowedPqMetaSysColumns() { + std::vector<TString> res; + res.reserve(PqMetaFields.size()); + + for (const auto& descriptor : PqMetaFields) { + res.emplace_back(descriptor.SysColumn); + } + + return res; +} + +std::vector<TString> AllowedPqMetaCallables() { + std::vector<TString> res; + res.reserve(PqMetaFields.size()); + + for (const auto& descriptor : PqMetaFields) { + res.emplace_back(descriptor.CallableName); + } + + return res; +} + +} diff --git a/ydb/library/yql/providers/pq/common/pq_meta_fields.h b/ydb/library/yql/providers/pq/common/pq_meta_fields.h new file mode 100644 index 00000000000..ce73ca5ccce --- /dev/null +++ b/ydb/library/yql/providers/pq/common/pq_meta_fields.h @@ -0,0 +1,36 @@ +#pragma once + +#include <optional> + +#include <ydb/library/yql/public/udf/udf_data_type.h> +#include <ydb/library/yql/public/udf/udf_value.h> + +#include <util/generic/string.h> +#include <util/string/builder.h> + + +namespace NYql { + +struct TMetaFieldDescriptor { +public: + TMetaFieldDescriptor(TString callableName, TString sysColumn, NUdf::EDataSlot type) + : CallableName(callableName) + , SysColumn(sysColumn) + , Type(type) + { } + +public: + const TString CallableName; + const TString SysColumn; + const NUdf::EDataSlot Type; +}; + +const TMetaFieldDescriptor* FindPqMetaFieldDescriptorByCallable(const TString& callableName); + +const TMetaFieldDescriptor* FindPqMetaFieldDescriptorBySysColumn(const TString& sysColumn); + +std::vector<TString> AllowedPqMetaSysColumns(); + +std::vector<TString> AllowedPqMetaCallables(); + +} diff --git a/ydb/library/yql/providers/pq/proto/dq_io.proto b/ydb/library/yql/providers/pq/proto/dq_io.proto index e87eb72755c..c5c3ebe1d0f 100644 --- a/ydb/library/yql/providers/pq/proto/dq_io.proto +++ b/ydb/library/yql/providers/pq/proto/dq_io.proto @@ -24,6 +24,7 @@ message TDqPqTopicSource { bool UseSsl = 7; bool AddBearerToToken = 8; string DatabaseId = 9; + repeated string MetadataFields = 10; } message TDqPqTopicSink { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp index 4d43adc7695..ec7623ab155 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp @@ -5,13 +5,14 @@ #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/dq/opt/dq_opt.h> -#include <ydb/library/yql/providers/pq/common/yql_names.h> -#include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h> #include <ydb/library/yql/providers/common/config/yql_configuration_transformer.h> -#include <ydb/library/yql/providers/common/provider/yql_provider.h> -#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/transform/yql_lazy_init.h> +#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h> +#include <ydb/library/yql/providers/pq/common/yql_names.h> +#include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h> #include <ydb/library/yql/utils/log/log.h> @@ -102,13 +103,21 @@ public: return nullptr; } + TVector<TCoNameValueTuple> sourceMetadata; + for (auto sysColumn : AllowedPqMetaSysColumns()) { + sourceMetadata.push_back(Build<TCoNameValueTuple>(ctx, read.Pos()) + .Name().Build("system") + .Value<TCoAtom>().Build(sysColumn) + .Done()); + } + auto topicNode = Build<TPqTopic>(ctx, read.Pos()) .Cluster().Value(cluster).Build() .Database().Value(State_->Configuration->GetDatabaseForTopic(cluster)).Build() .Path().Value(topicKeyParser.GetTopicPath()).Build() .RowSpec(topicMeta->RowSpec) .Props(BuildTopicPropsList(*topicMeta, read.Pos(), ctx)) - .Metadata().Build() + .Metadata().Add(sourceMetadata).Build() .Done(); auto builder = Build<TPqReadTopic>(ctx, read.Pos()) diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp index e2d26fd84cc..e9f4178cd8c 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp @@ -5,6 +5,7 @@ #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h> #include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h> #include <ydb/library/yql/utils/log/log.h> @@ -26,6 +27,7 @@ public: AddHandler({TPqReadTopic::CallableName()}, Hndl(&TSelf::HandleReadTopic)); AddHandler({TPqTopic::CallableName()}, Hndl(&TSelf::HandleTopic)); AddHandler({TDqPqTopicSource::CallableName()}, Hndl(&TSelf::HandleDqTopicSource)); + AddHandler({TCoWriteTime::CallableName(), TCoOffset::CallableName()}, Hndl(&TSelf::HandleMetadata)); } TStatus HandleConfigure(const TExprNode::TPtr& input, TExprContext& ctx) { @@ -45,29 +47,41 @@ public: return TStatus::Ok; } - const TTypeAnnotationNode* GetReadTopicSchema(TPqTopic topic, TMaybeNode<TCoAtomList> columns, TExprBase input, TExprContext& ctx, TVector<TString>& columnOrder) { - auto schema = topic.Ref().GetTypeAnn(); + const TTypeAnnotationNode* GetReadTopicSchema(TPqTopic topic, TMaybeNode<TCoAtomList> columns, TExprContext& ctx, TVector<TString>& columnOrder) { + TVector<const TItemExprType*> items; + items.reserve((columns ? columns.Cast().Ref().ChildrenSize() : 0) + topic.Metadata().Size()); + + const auto* itemSchema = topic.Ref().GetTypeAnn()->Cast<TListExprType>() + ->GetItemType()->Cast<TStructExprType>(); + + std::unordered_set<TString> addedFields; if (columns) { - TVector<const TItemExprType*> items; - items.reserve(columns.Cast().Ref().ChildrenSize()); columnOrder.reserve(items.capacity()); - auto itemSchema = topic.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); for (auto c : columns.Cast().Ref().ChildrenList()) { if (!EnsureAtom(*c, ctx)) { return nullptr; } auto index = itemSchema->FindItem(c->Content()); if (!index) { - ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder() << "Unable to find column: " << c->Content())); + ctx.AddError(TIssue(ctx.GetPosition(topic.Pos()), TStringBuilder() << "Unable to find column: " << c->Content())); return nullptr; } columnOrder.push_back(TString(c->Content())); items.push_back(itemSchema->GetItems()[*index]); + addedFields.emplace(c->Content()); } - schema = ctx.MakeType<TListExprType>(ctx.MakeType<TStructExprType>(items)); } - return schema; + + for (auto c : itemSchema->GetItems()) { + if (addedFields.contains(TString(c->GetName()))) { + continue; + } + + items.push_back(c); + } + + return ctx.MakeType<TListExprType>(ctx.MakeType<TStructExprType>(items)); } TStatus HandleReadTopic(TExprBase input, TExprContext& ctx) { @@ -99,7 +113,7 @@ public: } TVector<TString> columnOrder; - auto schema = GetReadTopicSchema(topic, read.Columns().Maybe<TCoAtomList>(), input, ctx, columnOrder); + auto schema = GetReadTopicSchema(topic, read.Columns().Maybe<TCoAtomList>(), ctx, columnOrder); if (!schema) { return TStatus::Error; } @@ -131,7 +145,23 @@ public: return TStatus::Error; } - input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String))); + if (topic.Metadata().Empty()) { + input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String))); + return TStatus::Ok; + } + + TTypeAnnotationNode::TListType tupleItems; + tupleItems.reserve(topic.Metadata().Size() + 1); + + tupleItems.emplace_back(ctx.MakeType<TDataExprType>(EDataSlot::String)); + for (const auto metadataField : topic.Metadata()) { + const auto metadataSysColumn = metadataField.Value().Maybe<TCoAtom>().Cast().StringValue(); + const auto* descriptor = FindPqMetaFieldDescriptorBySysColumn(metadataSysColumn); + Y_ENSURE(descriptor); + tupleItems.emplace_back(ctx.MakeType<TDataExprType>(descriptor->Type)); + } + + input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TTupleExprType>(tupleItems))); return TStatus::Ok; } @@ -141,7 +171,72 @@ public: } TPqTopic topic(input); - input->SetTypeAnn(ctx.MakeType<TListExprType>(topic.RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>())); + TVector<const TItemExprType*> outputItems; + + auto rowSchema = topic.RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); + for (const auto& rowSchemaItem : rowSchema->GetItems()) { + outputItems.push_back(rowSchemaItem); + } + + for (auto nameValue : topic.Metadata()) { + const auto metadataSysColumn = nameValue.Value().Maybe<TCoAtom>().Cast().StringValue(); + const auto* descriptor = FindPqMetaFieldDescriptorBySysColumn(metadataSysColumn); + Y_ENSURE(descriptor); + outputItems.emplace_back(ctx.MakeType<TItemExprType>(descriptor->SysColumn, ctx.MakeType<TDataExprType>(descriptor->Type))); + } + + const auto* itemType = ctx.MakeType<TStructExprType>(outputItems); + input->SetTypeAnn(ctx.MakeType<TListExprType>(itemType)); + return TStatus::Ok; + } + + TStatus HandleMetadata(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + const auto* descriptor = FindPqMetaFieldDescriptorByCallable(TString(input->Content())); + + if (!EnsureDependsOn(input->Head(), ctx)) { + return IGraphTransformer::TStatus::Error; + } + + auto depOn = input->Head().HeadPtr(); + if (!EnsureStructType(*depOn, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + if (depOn->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Struct) { + auto structType = depOn->GetTypeAnn()->Cast<TStructExprType>(); + if (auto pos = structType->FindItem(descriptor->SysColumn)) { + bool isOptional = false; + const TDataExprType* dataType = nullptr; + if (!EnsureDataOrOptionalOfData(depOn->Pos(), structType->GetItems()[*pos]->GetItemType(), isOptional, dataType, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureSpecificDataType(depOn->Pos(), *dataType, descriptor->Type, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + output = ctx.Builder(input->Pos()) + .Callable("Member") + .Add(0, depOn) + .Atom(1, descriptor->SysColumn, TNodeFlags::Default) + .Seal() + .Build(); + + if (isOptional) { + output = ctx.Builder(input->Pos()) + .Callable("Coalesce") + .Add(0, output) + .Callable(1, "String") + .Atom(0, "0", TNodeFlags::Default) + .Seal() + .Seal() + .Build(); + } + return IGraphTransformer::TStatus::Repeat; + } + } + + input->SetTypeAnn(ctx.MakeType<TDataExprType>(descriptor->Type)); return TStatus::Ok; } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index 237f0d82389..e4620cb2a8e 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -4,14 +4,15 @@ #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> -#include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h> #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> +#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h> #include <ydb/library/yql/providers/pq/common/yql_names.h> #include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h> #include <ydb/library/yql/providers/pq/proto/dq_io.pb.h> #include <ydb/library/yql/providers/pq/proto/dq_task_params.pb.h> +#include <ydb/library/yql/utils/log/log.h> #include <util/string/builder.h> @@ -72,37 +73,49 @@ public: if (const auto& maybePqReadTopic = TMaybeNode<TPqReadTopic>(read)) { const auto& pqReadTopic = maybePqReadTopic.Cast(); - const auto rowType = pqReadTopic.Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back()->Cast<TListExprType>()->GetItemType(); + const auto rowType = pqReadTopic.Ref().GetTypeAnn() + ->Cast<TTupleExprType>()->GetItems().back()->Cast<TListExprType>() + ->GetItemType()->Cast<TStructExprType>(); const auto& clusterName = pqReadTopic.DataSource().Cluster().StringValue(); - TExprNode::TListType settings(1U, - ctx.Builder(pqReadTopic.Topic().Pos()) - .List() - .Atom(0, "format", TNodeFlags::Default) - .Add(1, pqReadTopic.Format().Ptr()) - .Seal().Build() - ); + TVector<TCoNameValueTuple> settings; + settings.push_back(Build<TCoNameValueTuple>(ctx, pqReadTopic.Pos()) + .Name().Build("format") + .Value(pqReadTopic.Format()) + .Done()); + TVector<TCoNameValueTuple> innerSettings; if (pqReadTopic.Compression() != "") { - settings.emplace_back( - ctx.Builder(pqReadTopic.Compression().Pos()) - .List() - .Atom(0, "settings", TNodeFlags::Default) - .List(1) - .List(0) - .Atom(0, "compression") - .Atom(1, pqReadTopic.Compression()) - .Seal() - .Seal() - .Seal().Build() - ); + innerSettings.push_back(Build<TCoNameValueTuple>(ctx, pqReadTopic.Pos()) + .Name().Build("compression") + .Value(pqReadTopic.Compression()) + .Done()); + } + + if (!innerSettings.empty()) { + settings.push_back(Build<TCoNameValueTuple>(ctx, pqReadTopic.Pos()) + .Name().Build("settings") + .Value<TCoNameValueTupleList>() + .Add(innerSettings) + .Build() + .Done()); } + TExprNode::TListType metadataFieldsList; + for (auto sysColumn : AllowedPqMetaSysColumns()) { + metadataFieldsList.push_back(ctx.NewAtom(pqReadTopic.Pos(), sysColumn)); + } + + settings.push_back(Build<TCoNameValueTuple>(ctx, pqReadTopic.Pos()) + .Name().Build("metadataColumns") + .Value(ctx.NewList(pqReadTopic.Pos(), std::move(metadataFieldsList))) + .Done()); + const auto token = "cluster:default_" + clusterName; auto columns = pqReadTopic.Columns().Ptr(); if (!columns->IsList()) { const auto pos = columns->Pos(); - const auto& items = rowType->Cast<TStructExprType>()->GetItems(); + const auto& items = rowType->GetItems(); TExprNode::TListType cols; cols.reserve(items.size()); std::transform(items.cbegin(), items.cend(), std::back_inserter(cols), [&](const TItemExprType* item) { return ctx.NewAtom(pos, item->GetName()); }); @@ -120,7 +133,7 @@ public: .Build() .RowType(ExpandType(pqReadTopic.Pos(), *rowType, ctx)) .DataSource(pqReadTopic.DataSource().Cast<TCoDataSource>()) - .Settings(ctx.NewList(pqReadTopic.Topic().Pos(), std::move(settings))) + .Settings(Build<TCoNameValueTupleList>(ctx, read->Pos()).Add(settings).Done()) .Done().Ptr(); } return read; @@ -198,6 +211,10 @@ public: YQL_ENSURE(srcDesc.GetConsumerName(), "No consumer specified for PersQueue cluster"); } + for (const auto metadata : topic.Metadata()) { + srcDesc.AddMetadataFields(metadata.Value().Maybe<TCoAtom>().Cast().StringValue()); + } + protoSettings.PackFrom(srcDesc); sourceType = "PqSource"; } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp index 51f2a1ce172..35d293d6f78 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp @@ -70,7 +70,6 @@ public: return TStatus::Error; } - meta.RawFormat = (meta.RowSpec == nullptr); if (!meta.RowSpec) { meta.RowSpec = ExpandType(meta.Pos, *itemType, ctx); } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp index 6e8a6462d96..c665badb40e 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp @@ -1,12 +1,14 @@ #include "yql_pq_provider_impl.h" -#include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h> -#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> -#include <ydb/library/yql/providers/common/provider/yql_provider.h> -#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/core/yql_type_helpers.h> #include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/transform/yql_optimize.h> -#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> +#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h> +#include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h> #include <ydb/library/yql/utils/log/log.h> @@ -16,6 +18,102 @@ using namespace NNodes; namespace { +std::unordered_set<TString> GetUsedMetadataFields(const TCoExtractMembers& extract) { + std::unordered_set<TString> usedMetadataFields; + for (const auto extractMember : extract.Members()) { + if (FindPqMetaFieldDescriptorBySysColumn(extractMember.StringValue())) { + usedMetadataFields.emplace(extractMember.StringValue()); + } + } + + return usedMetadataFields; +} + +TVector<TCoNameValueTuple> DropUnusedMetadata(const TPqTopic& pqTopic, const std::unordered_set<TString>& usedMetadataFields) { + TVector<TCoNameValueTuple> newSourceMetadata; + for (auto metadataItem : pqTopic.Metadata()) { + auto metadataName = metadataItem.Cast<TCoNameValueTuple>().Value().Maybe<TCoAtom>().Cast().StringValue(); + if (usedMetadataFields.contains(metadataName)) { + newSourceMetadata.push_back(metadataItem); + } + } + + return newSourceMetadata; +} + +TCoNameValueTupleList DropUnusedMetadataFromDqWrapSettings( + const TDqSourceWrap& dqSourceWrap, + const TVector<TCoNameValueTuple>& newSourceMetadata, + TExprContext& ctx) +{ + TVector<TCoNameValueTuple> newSettings; + for (const auto settingItem : dqSourceWrap.Settings().Maybe<TCoNameValueTupleList>().Cast()) { + if (settingItem.Name() == "metadataColumns") { + std::vector<TExprNode::TPtr> newMetadataColumns; + newMetadataColumns.reserve(newSourceMetadata.size()); + + for (auto metadataName : newSourceMetadata) { + newMetadataColumns.push_back(ctx.NewAtom( + dqSourceWrap.Pos(), + metadataName.Value().Maybe<TCoAtom>().Cast().StringValue())); + } + + if (!newMetadataColumns.empty()) { + newSettings.push_back(Build<TCoNameValueTuple>(ctx, dqSourceWrap.Pos()) + .Name().Build("metadataColumns") + .Value(ctx.NewList(dqSourceWrap.Pos(), std::move(newMetadataColumns))) + .Done()); + } + + continue; + } + + newSettings.push_back(settingItem); + } + + return Build<TCoNameValueTupleList>(ctx, dqSourceWrap.Pos()) + .Add(std::move(newSettings)) + .Done(); +} + +TExprNode::TPtr DropUnusedMetadataFieldsFromRowType( + TPositionHandle position, + const TStructExprType* oldRowType, + const std::unordered_set<TString>& usedMetadataFields, + TExprContext& ctx) +{ + TVector<const TItemExprType*> newFields; + newFields.reserve(oldRowType->GetSize()); + + for (auto itemExprType : oldRowType->GetItems()) { + const auto columnName = TString(itemExprType->GetName()); + if (FindPqMetaFieldDescriptorBySysColumn(columnName) && !usedMetadataFields.contains(columnName)) { + continue; + } + + newFields.push_back(itemExprType); + } + + return ExpandType(position, *ctx.MakeType<TStructExprType>(newFields), ctx); +} + +TExprNode::TPtr DropUnusedMetadataFieldsFromColumns( + TExprBase oldColumns, + const std::unordered_set<TString>& usedMetadataFields, + TExprContext& ctx) +{ + TExprNode::TListType res; + for (const auto& column : oldColumns.Cast<TCoAtomList>()) { + if (FindPqMetaFieldDescriptorBySysColumn(column.StringValue()) && !usedMetadataFields.contains(column.StringValue())) { + continue; + } + + res.push_back(column.Ptr()); + } + + return ctx.NewList(oldColumns.Pos(), std::move(res)); +} + class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase { public: TPqLogicalOptProposalTransformer(TPqState::TPtr state) @@ -25,7 +123,7 @@ public: #define HNDL(name) "LogicalOptimizer-"#name, Hndl(&TPqLogicalOptProposalTransformer::name) AddHandler(0, &TCoLeft::Match, HNDL(TrimReadWorld)); // AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembers)); - // AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqWrap)); + AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqWrap)); #undef HNDL } @@ -37,6 +135,7 @@ public: return TExprBase(ctx.NewWorld(node.Pos())); } + /* TMaybeNode<TExprBase> ExtractMembers(TExprBase node, TExprContext& ctx) const { const auto& extract = node.Cast<TCoExtractMembers>(); @@ -55,25 +154,62 @@ public: .Columns(extract.Members()) .Build() .Done(); - } + }*/ TMaybeNode<TExprBase> ExtractMembersOverDqWrap(TExprBase node, TExprContext& ctx) const { const auto& extract = node.Cast<TCoExtractMembers>(); const auto& input = extract.Input(); - const auto& read = input.Maybe<TDqReadWrap>().Input().Maybe<TPqReadTopic>(); - if (!read) { + const auto dqSourceWrap = input.Maybe<TDqSourceWrap>(); + const auto dqPqTopicSource = dqSourceWrap.Input().Maybe<TDqPqTopicSource>(); + const auto pqTopic = dqPqTopicSource.Topic().Maybe<TPqTopic>(); + if (!pqTopic) { return node; } - const auto& cast = read.Cast(); - return Build<TDqReadWrap>(ctx, node.Pos()) - .InitFrom(input.Cast<TDqReadWrap>()) - .Input<TPqReadTopic>() - .InitFrom(cast) - .Columns(extract.Members()) - .Build() + const auto usedMetadataFields = GetUsedMetadataFields(extract); + const auto newSourceMetadata = DropUnusedMetadata(pqTopic.Cast(), usedMetadataFields); + if (newSourceMetadata.size() == pqTopic.Metadata().Cast().Size()) { + return node; + } + + const auto oldRowType = pqTopic.Ref().GetTypeAnn() + ->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + + auto newPqTopicSource = Build<TDqPqTopicSource>(ctx, node.Pos()) + .InitFrom(dqPqTopicSource.Cast()) + .Topic<TPqTopic>() + .InitFrom(pqTopic.Cast()) + .Metadata().Add(newSourceMetadata).Build() + .Build(); + + if (dqPqTopicSource.Columns()) { + auto newColumns = DropUnusedMetadataFieldsFromColumns( + dqPqTopicSource.Columns().Cast(), + usedMetadataFields, + ctx); + newPqTopicSource.Columns(newColumns); + } + + const auto newDqSourceWrap = Build<TDqSourceWrap>(ctx, node.Pos()) + .InitFrom(dqSourceWrap.Cast()) + .Input(newPqTopicSource.Done()) + .Settings(DropUnusedMetadataFromDqWrapSettings( + dqSourceWrap.Cast(), + newSourceMetadata, + ctx)) + .RowType(DropUnusedMetadataFieldsFromRowType( + node.Pos(), + oldRowType, + usedMetadataFields, + ctx)) + .Done() + .Ptr(); + + return Build<TCoExtractMembers>(ctx, node.Pos()) + .InitFrom(extract) + .Input(ctx.ReplaceNode(input.Ptr(), dqSourceWrap.Ref(), newDqSourceWrap)) .Done(); - }*/ + } private: TPqState::TPtr State_; diff --git a/ydb/library/yql/sql/v1/builtin.cpp b/ydb/library/yql/sql/v1/builtin.cpp index 0452bdea916..27c8e76cb81 100644 --- a/ydb/library/yql/sql/v1/builtin.cpp +++ b/ydb/library/yql/sql/v1/builtin.cpp @@ -2911,6 +2911,14 @@ struct TBuiltinFuncData { {"tablerows", BuildSimpleBuiltinFactoryCallback<TTableRows>() }, {"weakfield", BuildSimpleBuiltinFactoryCallback<TWeakFieldOp>()}, + // meta fields + {"writetime", BuildNamedBuiltinFactoryCallback<TCallDirectRow>("WriteTime")}, + {"offset", BuildNamedBuiltinFactoryCallback<TCallDirectRow>("Offset")}, + // {"createtime", BuildNamedBuiltinFactoryCallback<TCallDirectRow>("CreateTime")}, + // {"partitionid", BuildNamedBuiltinFactoryCallback<TCallDirectRow>("PartitionId")}, + // {"messagegroupid", BuildNamedBuiltinFactoryCallback<TCallDirectRow>("MessageGroupId")}, + // {"seqno", BuildNamedBuiltinFactoryCallback<TCallDirectRow>("SeqNo")}, + // Hint builtins {"grouping", BuildSimpleBuiltinFactoryCallback<TGroupingNode>()}, diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp index 6361c9a77c5..bf7078202ff 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp @@ -499,9 +499,12 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr &ev return; } + State = ES_WAIT_TABLE_REQUEST_1; + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { TStringBuilder errorReason; errorReason << "kqp error Marker# PQ53 : " << record; + CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx); return; } @@ -509,7 +512,6 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr &ev Y_VERIFY(!KqpSessionId.empty()); SendSelectPartitionRequest(EncodedSourceId.Hash, FullConverter->GetClientsideName(), ctx); - State = ES_WAIT_TABLE_REQUEST_1; } void TWriteSessionActor::SendSelectPartitionRequest(ui32 hash, const TString& topic, const NActors::TActorContext& ctx) { |