#include "base_impl.h"
#include "converter.h"
#include "threaded.h"
#include "util.h"
#include <thread>
namespace contourpy {
ThreadedContourGenerator::ThreadedContourGenerator(
const CoordinateArray& x, const CoordinateArray& y, const CoordinateArray& z,
const MaskArray& mask, bool corner_mask, LineType line_type, FillType fill_type,
bool quad_as_tri, ZInterp z_interp, index_t x_chunk_size, index_t y_chunk_size,
index_t n_threads)
: BaseContourGenerator(x, y, z, mask, corner_mask, line_type, fill_type, quad_as_tri, z_interp,
x_chunk_size, y_chunk_size),
_n_threads(limit_n_threads(n_threads, get_n_chunks())),
_next_chunk(0)
{}
void ThreadedContourGenerator::export_filled(
const ChunkLocal& local, std::vector<py::list>& return_lists)
{
// Reimplementation of SerialContourGenerator::export_filled() to separate out the creation of
// numpy arrays (which requires a thread lock) from the population of those arrays (which does
// not). This minimises the time that the lock is used for.
assert(local.total_point_count > 0);
switch (get_fill_type())
{
case FillType::OuterCode:
case FillType::OuterOffset: {
assert(!has_direct_points() && !has_direct_line_offsets());
auto outer_count = local.line_count - local.hole_count;
bool outer_code = (get_fill_type() == FillType::OuterCode);
std::vector<PointArray::value_type*> points_ptrs(outer_count);
std::vector<CodeArray::value_type*> codes_ptrs(outer_code ? outer_count: 0);
std::vector<OffsetArray::value_type*> offsets_ptrs(outer_code ? 0 : outer_count);
{
Lock lock(*this); // cppcheck-suppress unreadVariable
for (decltype(outer_count) i = 0; i < outer_count; ++i) {
auto outer_start = local.outer_offsets.start[i];
auto outer_end = local.outer_offsets.start[i+1];
auto point_start = local.line_offsets.start[outer_start];
auto point_end = local.line_offsets.start[outer_end];
auto point_count = point_end - point_start;
assert(point_count > 2);
index_t points_shape[2] = {static_cast<index_t>(point_count), 2};
PointArray point_array(points_shape);
return_lists[0].append(point_array);
points_ptrs[i] = point_array.mutable_data();
if (outer_code) {
index_t codes_shape = static_cast<index_t>(point_count);
CodeArray code_array(codes_shape);
return_lists[1].append(code_array);
codes_ptrs[i] = code_array.mutable_data();
}
else {
index_t offsets_shape = static_cast<index_t>(outer_end - outer_start + 1);
OffsetArray offset_array(offsets_shape);
return_lists[1].append(offset_array);
offsets_ptrs[i] = offset_array.mutable_data();
}
}
}
for (decltype(outer_count) i = 0; i < outer_count; ++i) {
auto outer_start = local.outer_offsets.start[i];
auto outer_end = local.outer_offsets.start[i+1];
auto point_start = local.line_offsets.start[outer_start];
auto point_end = local.line_offsets.start[outer_end];
auto point_count = point_end - point_start;
assert(point_count > 2);
Converter::convert_points(
point_count, local.points.start + 2*point_start, points_ptrs[i]);
if (outer_code)
Converter::convert_codes(
point_count, outer_end - outer_start + 1,
local.line_offsets.start + outer_start, point_start, codes_ptrs[i]);
else
Converter::convert_offsets(
outer_end - outer_start + 1, local.line_offsets.start + outer_start,
point_start, offsets_ptrs[i]);
}
break;
}
case FillType::ChunkCombinedCode:
case FillType::ChunkCombinedCodeOffset: {
assert(has_direct_points() && !has_direct_line_offsets());
// return_lists[0][local_chunk] already contains combined points.
// If ChunkCombinedCodeOffset. return_lists[2][local.chunk] already contains outer
// offsets.
index_t codes_shape = static_cast<index_t>(local.total_point_count);
CodeArray::value_type* codes_ptr = nullptr;
{
Lock lock(*this); // cppcheck-suppress unreadVariable
CodeArray code_array(codes_shape);
return_lists[1][local.chunk] = code_array;
codes_ptr = code_array.mutable_data();
}
Converter::convert_codes(
local.total_point_count, local.line_count + 1, local.line_offsets.start, 0,
codes_ptr);
break;
}
case FillType::ChunkCombinedOffset:
case FillType::ChunkCombinedOffsetOffset:
assert(has_direct_points() && has_direct_line_offsets());
if (get_fill_type() == FillType::ChunkCombinedOffsetOffset) {
assert(has_direct_outer_offsets());
}
// return_lists[0][local_chunk] already contains combined points.
// return_lists[1][local.chunk] already contains line offsets.
// If ChunkCombinedOffsetOffset, return_lists[2][local.chunk] already contains
// outer offsets.
break;
}
}
void ThreadedContourGenerator::export_lines(
const ChunkLocal& local, std::vector<py::list>& return_lists)
{
// Reimplementation of SerialContourGenerator::export_lines() to separate out the creation of
// numpy arrays (which requires a thread lock) from the population of those arrays (which does
// not). This minimises the time that the lock is used for.
assert(local.total_point_count > 0);
switch (get_line_type())
{
case LineType::Separate:
case LineType::SeparateCode: {
assert(!has_direct_points() && !has_direct_line_offsets());
bool separate_code = (get_line_type() == LineType::SeparateCode);
std::vector<PointArray::value_type*> points_ptrs(local.line_count);
std::vector<CodeArray::value_type*> codes_ptrs(separate_code ? local.line_count: 0);
{
Lock lock(*this); // cppcheck-suppress unreadVariable
for (decltype(local.line_count) i = 0; i < local.line_count; ++i) {
auto point_start = local.line_offsets.start[i];
auto point_end = local.line_offsets.start[i+1];
auto point_count = point_end - point_start;
assert(point_count > 1);
index_t points_shape[2] = {static_cast<index_t>(point_count), 2};
PointArray point_array(points_shape);
return_lists[0].append(point_array);
points_ptrs[i] = point_array.mutable_data();
if (separate_code) {
index_t codes_shape = static_cast<index_t>(point_count);
CodeArray code_array(codes_shape);
return_lists[1].append(code_array);
codes_ptrs[i] = code_array.mutable_data();
}
}
}
for (decltype(local.line_count) i = 0; i < local.line_count; ++i) {
auto point_start = local.line_offsets.start[i];
auto point_end = local.line_offsets.start[i+1];
auto point_count = point_end - point_start;
assert(point_count > 1);
Converter::convert_points(
point_count, local.points.start + 2*point_start, points_ptrs[i]);
if (separate_code) {
Converter::convert_codes_check_closed_single(
point_count, local.points.start + 2*point_start, codes_ptrs[i]);
}
}
break;
}
case LineType::ChunkCombinedCode: {
assert(has_direct_points() && !has_direct_line_offsets());
// return_lists[0][local.chunk] already contains points.
index_t codes_shape = static_cast<index_t>(local.total_point_count);
CodeArray::value_type* codes_ptr = nullptr;
{
Lock lock(*this); // cppcheck-suppress unreadVariable
CodeArray code_array(codes_shape);
return_lists[1][local.chunk] = code_array;
codes_ptr = code_array.mutable_data();
}
Converter::convert_codes_check_closed(
local.total_point_count, local.line_count + 1, local.line_offsets.start,
local.points.start, codes_ptr);
break;
}
case LineType::ChunkCombinedOffset:
assert(has_direct_points() && has_direct_line_offsets());
// return_lists[0][local.chunk] already contains points.
// return_lists[1][local.chunk] already contains line offsets.
break;
case LineType::ChunkCombinedNan:
assert(has_direct_points());
// return_lists[0][local.chunk] already contains points.
break;
}
}
index_t ThreadedContourGenerator::get_thread_count() const
{
return _n_threads;
}
index_t ThreadedContourGenerator::limit_n_threads(index_t n_threads, index_t n_chunks)
{
index_t max_threads = std::max<index_t>(Util::get_max_threads(), 1);
if (n_threads == 0)
return std::min(max_threads, n_chunks);
else
return std::min({max_threads, n_chunks, n_threads});
}
void ThreadedContourGenerator::march(std::vector<py::list>& return_lists)
{
// Each thread executes thread_function() which has two stages:
// 1) Initialise cache z-levels and starting locations
// 2) Trace contours
// Each stage is performed on a chunk by chunk basis. There is a barrier between the two stages
// to synchronise the threads so the cache setup is complete before being used by the trace.
_next_chunk = 0; // Next available chunk index.
_finished_count = 0; // Count of threads that have finished the cache init.
// Main thread releases GIL for remainder of this function.
// It is temporarily reacquired as necessary within the scope of threaded Lock objects.
py::gil_scoped_release release;
// Create (_n_threads-1) new worker threads.
std::vector<std::thread> threads;
threads.reserve(_n_threads-1);
for (index_t i = 0; i < _n_threads-1; ++i)
threads.emplace_back(
&ThreadedContourGenerator::thread_function, this, std::ref(return_lists));
thread_function(std::ref(return_lists)); // Main thread work.
for (auto& thread : threads)
thread.join();
assert(_next_chunk == 2*get_n_chunks());
threads.clear();
}
void ThreadedContourGenerator::thread_function(std::vector<py::list>& return_lists)
{
// Function that is executed by each of the threads.
// _next_chunk starts at zero and increases up to 2*_n_chunks. A thread in need of work reads
// _next_chunk and incremements it, then processes that chunk. For _next_chunk < _n_chunks this
// is stage 1 (init cache levels and starting locations) and for _next_chunk >= _n_chunks this
// is stage 2 (trace contours). There is a synchronisation barrier between the two stages so
// that the cache initialisation is complete before being used by the contour trace.
auto n_chunks = get_n_chunks();
index_t chunk;
ChunkLocal local;
// Stage 1: Initialise cache z-levels and starting locations.
while (true) {
{
std::lock_guard<std::mutex> guard(_chunk_mutex);
if (_next_chunk < n_chunks)
chunk = _next_chunk++;
else
break; // No more work to do.
}
get_chunk_limits(chunk, local);
init_cache_levels_and_starts(&local);
local.clear();
}
{
// Implementation of multithreaded barrier. Each thread increments the shared counter.
// Last thread to finish notifies the other threads that they can all continue.
std::unique_lock<std::mutex> lock(_chunk_mutex);
_finished_count++;
if (_finished_count == _n_threads)
_condition_variable.notify_all();
else
_condition_variable.wait(lock);
}
// Stage 2: Trace contours.
while (true) {
{
std::lock_guard<std::mutex> guard(_chunk_mutex);
if (_next_chunk < 2*n_chunks)
chunk = _next_chunk++ - n_chunks;
else
break; // No more work to do.
}
get_chunk_limits(chunk, local);
march_chunk(local, return_lists);
local.clear();
}
}
} // namespace contourpy