aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-09-09 16:42:44 +0300
committerilnaz <ilnaz@ydb.tech>2022-09-09 16:42:44 +0300
commit8e4ae907ec6bf61b8a0dc0ad0ed61fc512559ea2 (patch)
treec61e6d8a5b82809f6698fec09bb91bb551a27017
parentc9d949de0a10fb0ac657d5cf8046f1729cdbe282 (diff)
downloadydb-8e4ae907ec6bf61b8a0dc0ad0ed61fc512559ea2.tar.gz
Metering fixes
-rw-r--r--ydb/core/metering/CMakeLists.txt1
-rw-r--r--ydb/core/metering/stream_ru_calculator.cpp34
-rw-r--r--ydb/core/metering/stream_ru_calculator.h23
-rw-r--r--ydb/core/metering/stream_ru_calculator_ut.cpp34
-rw-r--r--ydb/core/metering/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/metering/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/services/lib/actors/CMakeLists.txt1
-rw-r--r--ydb/services/lib/actors/pq_rl_helpers.cpp14
-rw-r--r--ydb/services/lib/actors/pq_rl_helpers.h5
9 files changed, 100 insertions, 14 deletions
diff --git a/ydb/core/metering/CMakeLists.txt b/ydb/core/metering/CMakeLists.txt
index 8768953bca..9d767ff495 100644
--- a/ydb/core/metering/CMakeLists.txt
+++ b/ydb/core/metering/CMakeLists.txt
@@ -22,6 +22,7 @@ target_link_libraries(ydb-core-metering PUBLIC
target_sources(ydb-core-metering PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/metering/bill_record.cpp
${CMAKE_SOURCE_DIR}/ydb/core/metering/metering.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/metering/stream_ru_calculator.cpp
)
generate_enum_serilization(ydb-core-metering
${CMAKE_SOURCE_DIR}/ydb/core/metering/bill_record.h
diff --git a/ydb/core/metering/stream_ru_calculator.cpp b/ydb/core/metering/stream_ru_calculator.cpp
new file mode 100644
index 0000000000..ab245838f7
--- /dev/null
+++ b/ydb/core/metering/stream_ru_calculator.cpp
@@ -0,0 +1,34 @@
+#include "stream_ru_calculator.h"
+
+namespace NKikimr::NMetering {
+
+TStreamRequestUnitsCalculator::TStreamRequestUnitsCalculator(ui64 blockSize)
+ : BlockSize(blockSize)
+ , Remainder(blockSize)
+{
+}
+
+ui64 TStreamRequestUnitsCalculator::CalcConsumption(ui64 payloadSize) {
+ if (!payloadSize) {
+ return 0;
+ }
+
+ if (payloadSize > Remainder) {
+ payloadSize -= Remainder;
+
+ const ui64 nBlocks = payloadSize / BlockSize;
+ payloadSize -= BlockSize * nBlocks;
+
+ Remainder = BlockSize - payloadSize;
+ return nBlocks + ui64(bool(payloadSize));
+ } else {
+ Remainder -= payloadSize;
+ return 0;
+ }
+}
+
+ui64 TStreamRequestUnitsCalculator::GetRemainder() const {
+ return Remainder;
+}
+
+} // NKikimr::NMetering
diff --git a/ydb/core/metering/stream_ru_calculator.h b/ydb/core/metering/stream_ru_calculator.h
new file mode 100644
index 0000000000..6c6d56e905
--- /dev/null
+++ b/ydb/core/metering/stream_ru_calculator.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <util/generic/fwd.h>
+
+namespace NKikimr::NMetering {
+
+class TStreamRequestUnitsCalculator {
+public:
+ // Remainder = blockSize on init
+ explicit TStreamRequestUnitsCalculator(ui64 blockSize);
+
+ // Returns consumption in terms of RUs (one block is one RU) and updates remainder
+ ui64 CalcConsumption(ui64 payloadSize);
+
+ ui64 GetRemainder() const;
+
+private:
+ const ui64 BlockSize;
+ ui64 Remainder; // remainder in the last block
+
+};
+
+} // NKikimr::NMetering
diff --git a/ydb/core/metering/stream_ru_calculator_ut.cpp b/ydb/core/metering/stream_ru_calculator_ut.cpp
new file mode 100644
index 0000000000..b4d222488c
--- /dev/null
+++ b/ydb/core/metering/stream_ru_calculator_ut.cpp
@@ -0,0 +1,34 @@
+#include "stream_ru_calculator.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/size_literals.h>
+
+namespace NKikimr::NMetering {
+
+Y_UNIT_TEST_SUITE(TStreamRequestUnitsCalculatorTest) {
+ Y_UNIT_TEST(Basic) {
+ TStreamRequestUnitsCalculator calculator(4_KB);
+ UNIT_ASSERT_VALUES_EQUAL(calculator.GetRemainder(), 4_KB);
+
+ UNIT_ASSERT_VALUES_EQUAL(calculator.CalcConsumption(0), 0);
+ UNIT_ASSERT_VALUES_EQUAL(calculator.GetRemainder(), 4_KB);
+
+ UNIT_ASSERT_VALUES_EQUAL(calculator.CalcConsumption(1), 0);
+ UNIT_ASSERT_VALUES_EQUAL(calculator.GetRemainder(), 4_KB - 1);
+
+ UNIT_ASSERT_VALUES_EQUAL(calculator.CalcConsumption(4_KB - 1), 0);
+ UNIT_ASSERT_VALUES_EQUAL(calculator.GetRemainder(), 0);
+
+ UNIT_ASSERT_VALUES_EQUAL(calculator.CalcConsumption(1), 1);
+ UNIT_ASSERT_VALUES_EQUAL(calculator.GetRemainder(), 4_KB - 1);
+
+ UNIT_ASSERT_VALUES_EQUAL(calculator.CalcConsumption(4_KB + 1), 1);
+ UNIT_ASSERT_VALUES_EQUAL(calculator.GetRemainder(), 4_KB - 2);
+
+ UNIT_ASSERT_VALUES_EQUAL(calculator.CalcConsumption(8_KB), 2);
+ UNIT_ASSERT_VALUES_EQUAL(calculator.GetRemainder(), 4_KB - 2);
+ }
+}
+
+} // NKikimr::NMetering
diff --git a/ydb/core/metering/ut/CMakeLists.darwin.txt b/ydb/core/metering/ut/CMakeLists.darwin.txt
index 6007b0d8eb..892e242993 100644
--- a/ydb/core/metering/ut/CMakeLists.darwin.txt
+++ b/ydb/core/metering/ut/CMakeLists.darwin.txt
@@ -27,6 +27,7 @@ target_link_options(ydb-core-metering-ut PRIVATE
CoreFoundation
)
target_sources(ydb-core-metering-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/metering/stream_ru_calculator_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/metering/time_grid_ut.cpp
)
add_test(
diff --git a/ydb/core/metering/ut/CMakeLists.linux.txt b/ydb/core/metering/ut/CMakeLists.linux.txt
index d920d0bf03..0d59db3335 100644
--- a/ydb/core/metering/ut/CMakeLists.linux.txt
+++ b/ydb/core/metering/ut/CMakeLists.linux.txt
@@ -31,6 +31,7 @@ target_link_options(ydb-core-metering-ut PRIVATE
-ldl
)
target_sources(ydb-core-metering-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/metering/stream_ru_calculator_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/metering/time_grid_ut.cpp
)
add_test(
diff --git a/ydb/services/lib/actors/CMakeLists.txt b/ydb/services/lib/actors/CMakeLists.txt
index ff1baa06b0..893dd07437 100644
--- a/ydb/services/lib/actors/CMakeLists.txt
+++ b/ydb/services/lib/actors/CMakeLists.txt
@@ -15,6 +15,7 @@ target_link_libraries(services-lib-actors PUBLIC
cpp-digest-md5
ydb-core-grpc_services
core-grpc_services-base
+ ydb-core-metering
ydb-core-mind
ydb-core-protos
library-persqueue-obfuscate
diff --git a/ydb/services/lib/actors/pq_rl_helpers.cpp b/ydb/services/lib/actors/pq_rl_helpers.cpp
index 2ee5df7238..8a7e8bf5ab 100644
--- a/ydb/services/lib/actors/pq_rl_helpers.cpp
+++ b/ydb/services/lib/actors/pq_rl_helpers.cpp
@@ -5,10 +5,9 @@
namespace NKikimr::NGRpcProxy::V1 {
TRlHelpers::TRlHelpers(NGRpcService::IRequestCtxBase* reqCtx, ui64 blockSize, const TDuration& waitDuration)
- : Request(reqCtx)
- , BlockSize(blockSize)
+ : TStreamRequestUnitsCalculator(blockSize)
+ , Request(reqCtx)
, WaitDuration(waitDuration)
- , PayloadBytes(0)
{
Y_VERIFY(Request);
}
@@ -65,14 +64,7 @@ ui64 TRlHelpers::CalcRuConsumption(ui64 payloadSize) {
return 0;
}
- const ui64 remainder = BlockSize - (PayloadBytes % BlockSize);
- PayloadBytes += payloadSize;
-
- if (payloadSize > remainder) {
- return Max<ui64>(1, (payloadSize - remainder) / BlockSize);
- }
-
- return 0;
+ return CalcConsumption(payloadSize);
}
}
diff --git a/ydb/services/lib/actors/pq_rl_helpers.h b/ydb/services/lib/actors/pq_rl_helpers.h
index 02b1f43698..cf00203399 100644
--- a/ydb/services/lib/actors/pq_rl_helpers.h
+++ b/ydb/services/lib/actors/pq_rl_helpers.h
@@ -1,13 +1,14 @@
#pragma once
#include <ydb/core/grpc_services/local_rate_limiter.h>
+#include <ydb/core/metering/stream_ru_calculator.h>
#include <ydb/core/protos/pqconfig.pb.h>
#include <util/datetime/base.h>
namespace NKikimr::NGRpcProxy::V1 {
-class TRlHelpers {
+class TRlHelpers: public NMetering::TStreamRequestUnitsCalculator {
public:
explicit TRlHelpers(NGRpcService::IRequestCtxBase* reqCtx, ui64 blockSize, const TDuration& waitDuration);
@@ -30,10 +31,8 @@ protected:
private:
NGRpcService::IRequestCtxBase* const Request;
- const ui64 BlockSize;
const TDuration WaitDuration;
- ui64 PayloadBytes;
TActorId RlActor;
TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode> MeteringMode;
};