aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Ozeritskiy <aozeritsky@gmail.com>2022-06-24 10:18:36 +0300
committerAlexey Ozeritskiy <aozeritsky@gmail.com>2022-06-24 10:18:36 +0300
commita4c1572074a6f4eb937168c283a61c43468d8650 (patch)
treef8fdaf6d6859ccf69b9ac6663b1f643590ff6ecd
parent0a71674a38d3db5db3295980b8127d3adc013944 (diff)
downloadydb-a4c1572074a6f4eb937168c283a61c43468d8650.tar.gz
Use Inflight istead of Queue.size
ref:0264a806a0447edbdc4533404dd0129796837904
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp9
1 files changed, 8 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index cf0f58785e..d473b7308e 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -131,8 +131,9 @@ public:
private:
void TryExecuteNext() {
TGuard<TMutex> lock(Mutex);
- if (!Queue.empty() && (!DeterministicMode || Queue.size() == 1)) {
+ if (!Queue.empty() && (!DeterministicMode || Inflight == 0)) {
auto request = std::move(Queue.front()); Queue.pop_front();
+ Inflight++;
lock.Release();
auto weak = weak_from_this();
@@ -146,6 +147,11 @@ private:
}
if (auto ptr = weak.lock()) {
+ {
+ TGuard<TMutex> lock(ptr->Mutex);
+ ptr->Inflight--;
+ }
+
ptr->TryExecuteNext();
}
});
@@ -157,6 +163,7 @@ private:
const bool DeterministicMode;
TMutex Mutex;
TList<TRequest> Queue;
+ int Inflight = 0;
};
class TDqGatewayLocal : public IDqGateway {