Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
las3_pub
/
predictable_parallel_patterns
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Members
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
9fa9296a
authored
5 years ago
by
FritzFlorian
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Final version ready for benchmarking.
parent
7d090b3c
master
Pipeline
#1542
failed with stages
in 61 minutes 12 seconds
Changes
27
Pipelines
1
Show whitespace changes
Inline
Side-by-side
Showing
27 changed files
with
246 additions
and
140 deletions
+246
-140
CMakeLists.txt
+3
-0
app/benchmark_fft/CMakeLists.txt
+2
-0
app/benchmark_fib/CMakeLists.txt
+2
-0
app/benchmark_matrix/CMakeLists.txt
+2
-0
app/benchmark_matrix/main.cpp
+11
-0
app/benchmark_matrix_div_conquer/CMakeLists.txt
+2
-0
app/benchmark_matrix_div_conquer/main.cpp
+3
-7
app/benchmark_unbalanced/CMakeLists.txt
+2
-0
app/benchmark_unbalanced/main.cpp
+1
-1
extern/benchmark_base/CMakeLists.txt
+1
-1
extern/benchmark_base/include/benchmark_base/matrix_div_conquer.h
+23
-48
extern/benchmark_base/src/matrix_div_conquer.cpp
+64
-0
extern/benchmark_runner/benchmark_runner.h
+1
-1
lib/pls/CMakeLists.txt
+1
-1
lib/pls/include/pls/algorithms/divide_and_conquer_buffers.h
+0
-5
lib/pls/include/pls/algorithms/for_each.h
+4
-4
lib/pls/include/pls/algorithms/for_each_impl.h
+45
-15
lib/pls/include/pls/internal/base/error_handling.h
+5
-0
lib/pls/include/pls/internal/base/stack_allocator.h
+1
-0
lib/pls/include/pls/internal/scheduling/base_task.h
+5
-4
lib/pls/include/pls/internal/scheduling/lock_free/task.h
+1
-3
lib/pls/include/pls/internal/scheduling/scheduler_impl.h
+18
-4
lib/pls/include/pls/pls.h
+3
-0
lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp
+22
-16
lib/pls/src/internal/scheduling/lock_free/task.cpp
+12
-20
lib/pls/src/internal/scheduling/lock_free/task_manager.cpp
+5
-4
lib/pls/src/internal/scheduling/scheduler.cpp
+7
-6
No files found.
CMakeLists.txt
View file @
9fa9296a
...
...
@@ -43,6 +43,9 @@ ADD_CUSTOM_TARGET(install.pls
-P
${
CMAKE_BINARY_DIR
}
/cmake_install.cmake
)
ADD_DEPENDENCIES
(
install.pls context_switcher pls
)
# ... second custom target to only build the benchmarks.
ADD_CUSTOM_TARGET
(
benchmark.pls
)
# Include examples
add_subdirectory
(
app/playground
)
add_subdirectory
(
app/benchmark_fft
)
...
...
This diff is collapsed.
Click to expand it.
app/benchmark_fft/CMakeLists.txt
View file @
9fa9296a
add_executable
(
benchmark_fft_pls_v3 main.cpp
)
target_link_libraries
(
benchmark_fft_pls_v3 pls benchmark_runner benchmark_base
)
ADD_DEPENDENCIES
(
benchmark.pls benchmark_fft_pls_v3
)
if
(
EASY_PROFILER
)
target_link_libraries
(
benchmark_fft_pls_v3 easy_profiler
)
endif
()
This diff is collapsed.
Click to expand it.
app/benchmark_fib/CMakeLists.txt
View file @
9fa9296a
add_executable
(
benchmark_fib_pls_v3 main.cpp
)
target_link_libraries
(
benchmark_fib_pls_v3 pls benchmark_runner benchmark_base
)
ADD_DEPENDENCIES
(
benchmark.pls benchmark_fib_pls_v3
)
if
(
EASY_PROFILER
)
target_link_libraries
(
benchmark_fib_pls_v3 easy_profiler
)
endif
()
This diff is collapsed.
Click to expand it.
app/benchmark_matrix/CMakeLists.txt
View file @
9fa9296a
add_executable
(
benchmark_matrix_pls_v3 main.cpp
)
target_link_libraries
(
benchmark_matrix_pls_v3 pls benchmark_runner benchmark_base
)
ADD_DEPENDENCIES
(
benchmark.pls benchmark_matrix_pls_v3
)
if
(
EASY_PROFILER
)
target_link_libraries
(
benchmark_matrix_pls_v3 easy_profiler
)
endif
()
This diff is collapsed.
Click to expand it.
app/benchmark_matrix/main.cpp
View file @
9fa9296a
...
...
@@ -37,6 +37,12 @@ int main(int argc, char **argv) {
pls
::
scheduler
scheduler
{(
unsigned
)
settings
.
num_threads_
,
MAX_NUM_TASKS
,
MAX_STACK_SIZE
};
if
(
settings
.
type_
==
benchmark_runner
::
benchmark_settings
::
ISOLATED
)
{
#if PLS_PROFILING_ENABLED
scheduler
.
get_profiler
().
disable_memory_measure
();
runner
.
add_custom_stats_field
(
"T_1"
);
runner
.
add_custom_stats_field
(
"T_inf"
);
#endif
printf
(
"Running isolated measurement...
\n
"
);
runner
.
enable_memory_stats
();
runner
.
pre_allocate_stats
();
...
...
@@ -45,6 +51,11 @@ int main(int argc, char **argv) {
scheduler
.
perform_work
([
&
]()
{
result
.
multiply
(
a
,
b
);
});
},
[
&
]()
{},
[
&
]()
{
#if PLS_PROFILING_ENABLED
runner
.
store_custom_stat
(
"T_1"
,
scheduler
.
get_profiler
().
current_run
().
t_1_
);
runner
.
store_custom_stat
(
"T_inf"
,
scheduler
.
get_profiler
().
current_run
().
t_inf_
);
#endif
});
runner
.
commit_results
(
true
);
}
else
{
...
...
This diff is collapsed.
Click to expand it.
app/benchmark_matrix_div_conquer/CMakeLists.txt
View file @
9fa9296a
add_executable
(
benchmark_matrix_div_conquer_pls_v3 main.cpp
)
target_link_libraries
(
benchmark_matrix_div_conquer_pls_v3 pls benchmark_runner benchmark_base
)
ADD_DEPENDENCIES
(
benchmark.pls benchmark_matrix_div_conquer_pls_v3
)
if
(
EASY_PROFILER
)
target_link_libraries
(
benchmark_matrix_div_conquer_pls_v3 easy_profiler
)
endif
()
This diff is collapsed.
Click to expand it.
app/benchmark_matrix_div_conquer/main.cpp
View file @
9fa9296a
...
...
@@ -92,12 +92,10 @@ void multiply_div_conquer(const std::vector<std::vector<std::vector<std::unique_
pls
::
spawn
(
[
&
]()
{
multiply_div_conquer
(
tmp_arrays
,
local_indices
,
size
/
2
,
depth
+
1
,
6
,
result_2_2_a
,
a_2_1
,
b_1_2
);
}
);
pls
::
spawn
(
pls
::
spawn
_and_sync
(
[
&
]()
{
multiply_div_conquer
(
tmp_arrays
,
local_indices
,
size
/
2
,
depth
+
1
,
7
,
result_2_2_b
,
a_2_2
,
b_2_2
);
}
);
pls
::
sync
();
// Combine results
for
(
size_t
i
=
0
;
i
<
(
size
/
2
)
*
(
size
/
2
);
i
++
)
{
// The layout is not important here, ass all have the same order, so just sum element wise
...
...
@@ -108,7 +106,6 @@ void multiply_div_conquer(const std::vector<std::vector<std::vector<std::unique_
}
}
constexpr
int
MAX_NUM_TASKS
=
10
;
constexpr
int
MAX_STACK_SIZE
=
4096
*
1
;
int
main
(
int
argc
,
char
**
argv
)
{
...
...
@@ -128,7 +125,6 @@ int main(int argc, char **argv) {
// Fill data arrays as needed
a
.
fill_default_data
();
b
.
fill_default_data
();
matrix_div_conquer
::
fill_block_lookup
(
size
);
// Strain local data
std
::
vector
<
std
::
vector
<
std
::
vector
<
std
::
unique_ptr
<
double
[]
>>>>
div_conquer_temp_arrays
;
...
...
@@ -138,7 +134,7 @@ int main(int argc, char **argv) {
while
(
remaining_size
>
matrix_div_conquer
::
CUTOFF_SIZE
)
{
auto
&
depth_buffers
=
div_conquer_temp_arrays
.
emplace_back
();
buffers_needed
=
std
::
min
(
buffers_needed
,
(
size_t
)
settings
.
num_threads_
);
for
(
in
t
thread_id
=
0
;
thread_id
<
buffers_needed
;
thread_id
++
)
{
for
(
size_
t
thread_id
=
0
;
thread_id
<
buffers_needed
;
thread_id
++
)
{
auto
&
depth_thread_buffers
=
depth_buffers
.
emplace_back
();
for
(
int
i
=
0
;
i
<
8
;
i
++
)
{
size_t
matrix_elements
=
(
remaining_size
/
2
)
*
(
remaining_size
/
2
);
...
...
@@ -159,7 +155,7 @@ int main(int argc, char **argv) {
string
full_directory
=
settings
.
output_directory_
+
"/PLS_v3/"
;
benchmark_runner
runner
{
full_directory
,
test_name
};
pls
::
scheduler
scheduler
{(
unsigned
)
settings
.
num_threads_
,
MAX_NUM_TASKS
,
MAX_STACK_SIZE
};
pls
::
scheduler
scheduler
{(
unsigned
)
settings
.
num_threads_
,
max_depth
+
2
,
MAX_STACK_SIZE
};
if
(
settings
.
type_
==
benchmark_runner
::
benchmark_settings
::
ISOLATED
)
{
printf
(
"Running isolated measurement...
\n
"
);
...
...
This diff is collapsed.
Click to expand it.
app/benchmark_unbalanced/CMakeLists.txt
View file @
9fa9296a
add_executable
(
benchmark_unbalanced_pls_v3 main.cpp
)
target_link_libraries
(
benchmark_unbalanced_pls_v3 benchmark_runner benchmark_base pls
)
ADD_DEPENDENCIES
(
benchmark.pls benchmark_unbalanced_pls_v3
)
if
(
EASY_PROFILER
)
target_link_libraries
(
benchmark_unbalanced_pls_v3 easy_profiler
)
endif
()
This diff is collapsed.
Click to expand it.
app/benchmark_unbalanced/main.cpp
View file @
9fa9296a
...
...
@@ -31,7 +31,7 @@ int unbalanced_tree_search(int seed, int root_children, double q, int normal_chi
return
count_child_nodes
(
root
);
}
constexpr
int
MAX_NUM_TASKS
=
256
;
constexpr
int
MAX_NUM_TASKS
=
180
;
constexpr
int
MAX_STACK_SIZE
=
4096
*
1
;
int
main
(
int
argc
,
char
**
argv
)
{
...
...
This diff is collapsed.
Click to expand it.
extern/benchmark_base/CMakeLists.txt
View file @
9fa9296a
...
...
@@ -9,7 +9,7 @@ add_library(benchmark_base STATIC
include/benchmark_base/unbalanced.h src/unbalanced.cpp
include/benchmark_base/range.h
include/benchmark_base/fib.h
include/benchmark_base/matrix_div_conquer.h
)
include/benchmark_base/matrix_div_conquer.h
src/matrix_div_conquer.cpp
)
target_include_directories
(
benchmark_base
PUBLIC
...
...
This diff is collapsed.
Click to expand it.
extern/benchmark_base/include/benchmark_base/matrix_div_conquer.h
View file @
9fa9296a
#ifndef COMPARISON_BENCHMARKS_BASE_MATRIX_DIV_CONQUER_H
#define COMPARISON_BENCHMARKS_BASE_MATRIX_DIV_CONQUER_H
#include <array>
#include <vector>
#include <cstdio>
#include <cstdlib>
namespace
comparison_benchmarks
{
namespace
base
{
namespace
matrix_div_conquer
{
const
int
MATRIX_SIZE
=
128
;
const
int
CUTOFF_SIZE
=
8
;
const
int
NUM_ITERATIONS
=
100
;
const
int
WARMUP_ITERATIONS
=
10
;
// Helpers to directly index into blocked matrices
const
size_t
MAX_SIZE
=
128
;
std
::
array
<
std
::
array
<
size_t
,
MAX_SIZE
>
,
MAX_SIZE
>
BLOCK_LOOKUP
;
// ROW, COLUMN
void
fill_block_lookup
(
size_t
size
=
MAX_SIZE
)
{
if
(
size
<=
1
)
{
BLOCK_LOOKUP
[
0
][
0
]
=
0
;
return
;
}
fill_block_lookup
(
size
/
2
);
size_t
elements_per_quarter
=
(
size
/
2
)
*
(
size
/
2
);
for
(
size_t
row
=
0
;
row
<
size
/
2
;
row
++
)
{
for
(
size_t
column
=
0
;
column
<
size
/
2
;
column
++
)
{
BLOCK_LOOKUP
[
row
][
size
/
2
+
column
]
=
BLOCK_LOOKUP
[
row
][
column
]
+
elements_per_quarter
;
BLOCK_LOOKUP
[
size
/
2
+
row
][
column
]
=
BLOCK_LOOKUP
[
row
][
column
]
+
2
*
elements_per_quarter
;
BLOCK_LOOKUP
[
size
/
2
+
row
][
size
/
2
+
column
]
=
BLOCK_LOOKUP
[
row
][
column
]
+
3
*
elements_per_quarter
;
}
}
}
class
blocked_matrix_view
{
public
:
blocked_matrix_view
(
double
*
data
,
size_t
size
)
:
data_
{
data
},
size_
{
size
}
{}
void
fill_default_data
()
{
for
(
size_t
row
=
0
;
row
<
size_
;
row
++
)
{
for
(
size_t
column
=
0
;
column
<
size_
;
column
++
)
{
at
(
row
,
column
)
=
row
;
}
blocked_matrix_view
(
double
*
data
,
size_t
size
)
:
data_
{
data
},
size_
{
size
}
{
if
(
size
>
BLOCK_LOOKUPS_SIZE
)
{
init_block_lookup
(
size
);
}
}
void
fill_default_data
();
blocked_matrix_view
quadrant_1_1
()
{
size_t
elements_per_quarter
=
(
size_
/
2
)
*
(
size_
/
2
);
return
blocked_matrix_view
(
data_
+
0
*
elements_per_quarter
,
size_
/
2
);
...
...
@@ -64,7 +40,7 @@ class blocked_matrix_view {
}
double
&
at
(
size_t
row
,
size_t
column
)
{
return
data_
[
BLOCK_LOOKUP
[
row
][
column
]];
return
data_
[
BLOCK_LOOKUP
S
[
block_lookup_at
(
row
,
column
)
]];
}
double
*
get_data
()
{
...
...
@@ -72,22 +48,21 @@ class blocked_matrix_view {
}
private
:
double
*
data_
;
size_t
size_
;
};
double
*
const
data_
;
const
size_t
size_
;
void
multiply_naive
(
size_t
size
,
blocked_matrix_view
&
result
,
blocked_matrix_view
&
a
,
blocked_matrix_view
&
b
)
{
for
(
size_t
i
=
0
;
i
<
size
;
i
++
)
{
for
(
size_t
j
=
0
;
j
<
size
;
j
++
)
{
result
.
at
(
i
,
j
)
=
0
;
}
for
(
size_t
j
=
0
;
j
<
size
;
j
++
)
{
for
(
size_t
k
=
0
;
k
<
size
;
k
++
)
{
result
.
at
(
i
,
j
)
+=
a
.
at
(
i
,
k
)
*
b
.
at
(
k
,
j
);
}
}
// Lookup indices for non divide-conquer block lookups
static
std
::
vector
<
size_t
>
BLOCK_LOOKUPS
;
static
size_t
BLOCK_LOOKUPS_SIZE
;
static
void
fill_block_lookup
(
size_t
size
,
std
::
vector
<
size_t
>
&
BLOCK_LOOKUP
);
static
void
init_block_lookup
(
size_t
max_size
);
static
size_t
block_lookup_at
(
size_t
row
,
size_t
column
)
{
return
row
*
BLOCK_LOOKUPS_SIZE
+
column
;
}
}
};
void
multiply_naive
(
size_t
size
,
blocked_matrix_view
&
result
,
blocked_matrix_view
&
a
,
blocked_matrix_view
&
b
);
}
}
...
...
This diff is collapsed.
Click to expand it.
extern/benchmark_base/src/matrix_div_conquer.cpp
0 → 100644
View file @
9fa9296a
#include <cstdio>
#include <cstdlib>
#include "benchmark_base/matrix_div_conquer.h"
namespace
comparison_benchmarks
{
namespace
base
{
namespace
matrix_div_conquer
{
void
multiply_naive
(
size_t
size
,
blocked_matrix_view
&
result
,
blocked_matrix_view
&
a
,
blocked_matrix_view
&
b
)
{
for
(
size_t
i
=
0
;
i
<
size
;
i
++
)
{
for
(
size_t
j
=
0
;
j
<
size
;
j
++
)
{
result
.
at
(
i
,
j
)
=
0
;
}
for
(
size_t
j
=
0
;
j
<
size
;
j
++
)
{
for
(
size_t
k
=
0
;
k
<
size
;
k
++
)
{
result
.
at
(
i
,
j
)
+=
a
.
at
(
i
,
k
)
*
b
.
at
(
k
,
j
);
}
}
}
}
void
blocked_matrix_view
::
fill_default_data
()
{
for
(
size_t
row
=
0
;
row
<
size_
;
row
++
)
{
for
(
size_t
column
=
0
;
column
<
size_
;
column
++
)
{
at
(
row
,
column
)
=
row
;
}
}
}
std
::
vector
<
size_t
>
blocked_matrix_view
::
BLOCK_LOOKUPS
;
size_t
blocked_matrix_view
::
BLOCK_LOOKUPS_SIZE
;
void
blocked_matrix_view
::
fill_block_lookup
(
size_t
size
,
std
::
vector
<
size_t
>
&
BLOCK_LOOKUP
)
{
if
(
size
<=
1
)
{
BLOCK_LOOKUP
[
block_lookup_at
(
0
,
0
)]
=
0
;
return
;
}
fill_block_lookup
(
size
/
2
,
BLOCK_LOOKUP
);
size_t
elements_per_quarter
=
(
size
/
2
)
*
(
size
/
2
);
for
(
size_t
row
=
0
;
row
<
size
/
2
;
row
++
)
{
for
(
size_t
column
=
0
;
column
<
size
/
2
;
column
++
)
{
BLOCK_LOOKUP
[
block_lookup_at
(
row
,
size
/
2
+
column
)]
=
BLOCK_LOOKUP
[
block_lookup_at
(
row
,
column
)]
+
elements_per_quarter
;
BLOCK_LOOKUP
[
block_lookup_at
(
size
/
2
+
row
,
column
)]
=
BLOCK_LOOKUP
[
block_lookup_at
(
row
,
column
)]
+
2
*
elements_per_quarter
;
BLOCK_LOOKUP
[
block_lookup_at
(
size
/
2
+
row
,
size
/
2
+
column
)]
=
BLOCK_LOOKUP
[
block_lookup_at
(
row
,
column
)]
+
3
*
elements_per_quarter
;
}
}
}
void
blocked_matrix_view
::
init_block_lookup
(
size_t
max_size
)
{
if
(
BLOCK_LOOKUPS
.
size
()
<
max_size
)
{
BLOCK_LOOKUPS
=
std
::
vector
<
size_t
>
(
max_size
*
max_size
);
BLOCK_LOOKUPS_SIZE
=
max_size
;
fill_block_lookup
(
max_size
,
BLOCK_LOOKUPS
);
}
}
}
}
}
This diff is collapsed.
Click to expand it.
extern/benchmark_runner/benchmark_runner.h
View file @
9fa9296a
...
...
@@ -299,7 +299,7 @@ class benchmark_runner {
long
wall_time_us
=
0
;
wall_time_us
+=
(
finish_time
.
tv_sec
-
iteration_start
.
tv_sec
)
*
1000l
*
1000l
;
wall_time_us
+=
((
long
)
finish_time
.
tv_nsec
-
(
long
)
iteration_start
.
tv_nsec
)
/
1000l
;
printf
(
"Difference: %d
\n
"
,
wall_time_us
-
times_
[
current_iteration
]);
printf
(
"Difference: %
l
d
\n
"
,
wall_time_us
-
times_
[
current_iteration
]);
times_
[
current_iteration
]
=
wall_time_us
;
if
(
finish_time
.
tv_sec
>=
deadline_end
.
tv_sec
&&
finish_time
.
tv_nsec
>
deadline_end
.
tv_nsec
)
{
...
...
This diff is collapsed.
Click to expand it.
lib/pls/CMakeLists.txt
View file @
9fa9296a
...
...
@@ -49,7 +49,7 @@ add_library(pls STATIC
include/pls/internal/profiling/dag_node.h src/internal/profiling/dag_node.cpp
include/pls/internal/profiling/profiler.h src/internal/profiling/profiler.cpp
include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp
include/pls/algorithms/divide_and_conquer_buffers.h
)
include/pls/internal/profiling/thread_stats.h src/internal/profiling/thread_stats.cpp
)
# Dependencies for pls
target_link_libraries
(
pls Threads::Threads
)
...
...
This diff is collapsed.
Click to expand it.
lib/pls/include/pls/algorithms/divide_and_conquer_buffers.h
deleted
100644 → 0
View file @
7d090b3c
#ifndef PLS_ALGORITHMS_DIVIDE_AND_CONQUER_BUFFERS_H_
#define PLS_ALGORITHMS_DIVIDE_AND_CONQUER_BUFFERS_H_
#endif //PLS_ALGORITHMS_DIVIDE_AND_CONQUER_BUFFERS_H_
This diff is collapsed.
Click to expand it.
lib/pls/include/pls/algorithms/for_each.h
View file @
9fa9296a
...
...
@@ -7,14 +7,14 @@
namespace
pls
::
algorithm
{
template
<
typename
Function
,
typename
ExecutionStrategy
>
static
void
for_each_range
(
unsigned
long
first
,
unsigned
long
last
,
static
void
for_each_range
(
long
first
,
long
last
,
const
Function
&
function
,
ExecutionStrategy
&
execution_strategy
);
template
<
typename
Function
>
static
void
for_each_range
(
unsigned
long
first
,
unsigned
long
last
,
static
void
for_each_range
(
long
first
,
long
last
,
const
Function
&
function
);
template
<
typename
RandomIt
,
typename
Function
,
typename
ExecutionStrategy
>
...
...
This diff is collapsed.
Click to expand it.
lib/pls/include/pls/algorithms/for_each_impl.h
View file @
9fa9296a
...
...
@@ -10,10 +10,10 @@ namespace pls::algorithm {
namespace
internal
{
template
<
typename
RandomIt
,
typename
Function
>
static
void
for_each
(
const
RandomIt
first
,
static
void
for_each
_iterator
(
const
RandomIt
first
,
const
RandomIt
last
,
const
Function
&
function
,
const
long
min_elements
)
{
const
size_t
min_elements
)
{
using
namespace
::
pls
::
internal
::
scheduling
;
const
long
num_elements
=
std
::
distance
(
first
,
last
);
...
...
@@ -27,13 +27,45 @@ static void for_each(const RandomIt first,
const
long
middle_index
=
num_elements
/
2
;
scheduler
::
spawn
([
first
,
middle_index
,
last
,
&
function
,
min_elements
]
{
internal
::
for_each
(
first
,
internal
::
for_each
_iterator
(
first
,
first
+
middle_index
,
function
,
min_elements
);
});
scheduler
::
spawn_and_sync
([
first
,
middle_index
,
last
,
&
function
,
min_elements
]
{
internal
::
for_each
(
first
+
middle_index
,
internal
::
for_each_iterator
(
first
+
middle_index
,
last
,
function
,
min_elements
);
});
}
}
template
<
typename
Function
>
static
void
for_each_range
(
const
long
first
,
const
long
last
,
const
Function
&
function
,
const
size_t
min_elements
)
{
using
namespace
::
pls
::
internal
::
scheduling
;
const
long
num_elements
=
last
-
first
;
if
(
num_elements
<=
min_elements
)
{
// calculate last elements in loop to avoid overhead
for
(
auto
current
=
first
;
current
!=
last
;
current
++
)
{
function
(
current
);
}
}
else
{
// Cut in half recursively
const
long
middle_index
=
num_elements
/
2
;
scheduler
::
spawn
([
first
,
middle_index
,
last
,
&
function
,
min_elements
]
{
internal
::
for_each_range
(
first
,
first
+
middle_index
,
function
,
min_elements
);
});
scheduler
::
spawn_and_sync
([
first
,
middle_index
,
last
,
&
function
,
min_elements
]
{
internal
::
for_each_range
(
first
+
middle_index
,
last
,
function
,
min_elements
);
...
...
@@ -44,15 +76,14 @@ static void for_each(const RandomIt first,
}
template
<
typename
RandomIt
,
typename
Function
,
typename
ExecutionStrategy
>
static
void
for_each
(
RandomIt
first
,
static
void
for_each
(
RandomIt
first
,
RandomIt
last
,
const
Function
&
function
,
ExecutionStrategy
execution_strategy
)
{
long
num_elements
=
std
::
distance
(
first
,
last
);
return
internal
::
for_each
(
first
,
last
,
function
,
execution_strategy
.
calculate_min_elements
(
num_elements
));
internal
::
for_each
_iterator
(
first
,
last
,
function
,
execution_strategy
.
calculate_min_elements
(
num_elements
));
}
template
<
typename
RandomIt
,
typename
Function
>
...
...
@@ -61,20 +92,19 @@ static void for_each(RandomIt first, RandomIt last, const Function &function) {
}
template
<
typename
Function
,
typename
ExecutionStrategy
>
static
void
for_each_range
(
unsigned
long
first
,
unsigned
long
last
,
static
void
for_each_range
(
long
first
,
long
last
,
const
Function
&
function
,
ExecutionStrategy
execution_strategy
)
{
auto
range
=
pls
::
internal
::
helpers
::
range
(
first
,
last
)
;
return
for_each
(
range
.
begin
(),
range
.
end
(),
function
,
execution_strategy
);
long
num_elements
=
last
-
first
;
return
internal
::
for_each_range
(
first
,
last
,
function
,
execution_strategy
.
calculate_min_elements
(
num_elements
)
);
}
template
<
typename
Function
>
static
void
for_each_range
(
unsigned
long
first
,
unsigned
long
last
,
static
void
for_each_range
(
long
first
,
long
last
,
const
Function
&
function
)
{
auto
range
=
pls
::
internal
::
helpers
::
range
(
first
,
last
);
return
for_each
(
range
.
begin
(),
range
.
end
(),
function
);
return
for_each_range
(
first
,
last
,
function
,
dynamic_strategy
{
4
});
}
}
...
...
This diff is collapsed.
Click to expand it.
lib/pls/include/pls/internal/base/error_handling.h
View file @
9fa9296a
...
...
@@ -18,4 +18,9 @@ void pls_error(const char *msg);
// TODO: Distinguish between debug/internal asserts and production asserts.
#define PLS_ASSERT(cond, msg) if (!(cond)) { pls_error(msg); }
// Enable/Disable more expensive asserts.
// On very small workloads also the 'normal' asserts can be disabled for more performance.
//#define PLS_ASSERT_EXPENSIVE(cond, msg) if (!(cond)) { pls_error(msg); }
#define PLS_ASSERT_EXPENSIVE(cond, msg)
#endif //PLS_ERROR_HANDLING_H
This diff is collapsed.
Click to expand it.
lib/pls/include/pls/internal/base/stack_allocator.h
View file @
9fa9296a
...
...
@@ -5,6 +5,7 @@
#include <cstddef>
namespace
pls
::
internal
::
base
{
class
stack_allocator
{
public
:
virtual
char
*
allocate_stack
(
size_t
size
)
=
0
;
...
...
This diff is collapsed.
Click to expand it.
lib/pls/include/pls/internal/scheduling/base_task.h
View file @
9fa9296a
...
...
@@ -56,12 +56,13 @@ struct base_task {
}
// General task information
unsigned
depth_
;
unsigned
thread_id_
;
const
unsigned
depth_
;
const
unsigned
thread_id_
;
// Stack/continuation management
char
*
stack_memory_
;
size_t
stack_size_
;
char
*
const
stack_memory_
;
const
size_t
stack_size_
;
context_switcher
::
continuation
continuation_
;
bool
is_synchronized_
;
bool
is_serial_section_
;
...
...
This diff is collapsed.
Click to expand it.
lib/pls/include/pls/internal/scheduling/lock_free/task.h
View file @
9fa9296a
...
...
@@ -30,11 +30,9 @@ struct task : public base_task {
static
task
*
find_task
(
unsigned
id
,
unsigned
depth
);
private
:
std
::
atomic
<
int
>
num_resources_
{};
// STAMP = thread id of 'owning' thread before task was inserted into stack.
// VALUE = next item in stack, indicated by thread ID.
std
::
atomic
<
data_structures
::
stamped_integer
>
resource_stack_next_
{{
0
,
0
}};
PLS_CACHE_ALIGN
std
::
atomic
<
data_structures
::
stamped_integer
>
resource_stack_next_
{{
0
,
0
}};
// STAMP = CAS stamp, half CAS length (16 or 32 Bit)
// VALUE = Root of the actual stack, indicated by thread ID (16 or 32 Bit)
...
...
This diff is collapsed.
Click to expand it.
lib/pls/include/pls/internal/scheduling/scheduler_impl.h
View file @
9fa9296a
...
...
@@ -63,6 +63,20 @@ scheduler::scheduler(unsigned int num_threads,
work_thread_main_loop
();
});
}
// Make sure all threads are created and touched their stacks.
// Executing a work section ensures one wakeup/sleep cycle of all workers
// and explicitly forcing one task per worker forces them to initialize their stacks.
std
::
atomic
<
unsigned
>
num_spawned
;
this
->
perform_work
([
&
]()
{
for
(
unsigned
i
=
0
;
i
<
num_threads
;
i
++
)
{
spawn
([
&
]()
{
num_spawned
++
;
while
(
num_spawned
<
num_threads
)
std
::
this_thread
::
yield
();
});
}
sync
();
});
}
class
scheduler
::
init_function
{
...
...
@@ -195,7 +209,7 @@ void scheduler::spawn_internal(Function &&lambda) {
#if PLS_SLEEP_WORKERS_ON_EMPTY
// TODO: relax atomic operations on empty flag
data_structures
::
stamped_integer
queue_empty_flag
=
spawning_state
.
get_queue_empty_flag
().
load
();
switch
(
queue_empty_flag
.
value
)
{
switch
(
queue_empty_flag
.
value
_
)
{
case
EMPTY_QUEUE_STATE
:
:
QUEUE_NON_EMPTY
:
{
// The queue was not found empty, ignore it.
break
;
...
...
@@ -203,9 +217,9 @@ void scheduler::spawn_internal(Function &&lambda) {
case
EMPTY_QUEUE_STATE
:
:
QUEUE_MAYBE_EMPTY
:
{
// Someone tries to mark us empty and might be re-stealing right now.
data_structures
::
stamped_integer
queue_non_empty_flag
{
queue_empty_flag
.
stamp
++
,
EMPTY_QUEUE_STATE
::
QUEUE_NON_EMPTY
};
queue_non_empty_flag
{
queue_empty_flag
.
stamp
_
++
,
EMPTY_QUEUE_STATE
::
QUEUE_NON_EMPTY
};
auto
actual_empty_flag
=
spawning_state
.
get_queue_empty_flag
().
exchange
(
queue_non_empty_flag
);
if
(
actual_empty_flag
.
value
==
EMPTY_QUEUE_STATE
::
QUEUE_EMPTY
)
{
if
(
actual_empty_flag
.
value
_
==
EMPTY_QUEUE_STATE
::
QUEUE_EMPTY
)
{
spawning_state
.
get_scheduler
().
empty_queue_decrease_counter_and_wake
();
}
break
;
...
...
@@ -213,7 +227,7 @@ void scheduler::spawn_internal(Function &&lambda) {
case
EMPTY_QUEUE_STATE
:
:
QUEUE_EMPTY
:
{
// Someone already marked the queue empty, we must revert its action on the central queue.
data_structures
::
stamped_integer
queue_non_empty_flag
{
queue_empty_flag
.
stamp
++
,
EMPTY_QUEUE_STATE
::
QUEUE_NON_EMPTY
};
queue_non_empty_flag
{
queue_empty_flag
.
stamp
_
++
,
EMPTY_QUEUE_STATE
::
QUEUE_NON_EMPTY
};
spawning_state
.
get_queue_empty_flag
().
store
(
queue_non_empty_flag
);
spawning_state
.
get_scheduler
().
empty_queue_decrease_counter_and_wake
();
break
;
...
...
This diff is collapsed.
Click to expand it.
lib/pls/include/pls/pls.h
View file @
9fa9296a
...
...
@@ -10,6 +10,7 @@
#include "pls/internal/scheduling/scheduler.h"
#include "pls/internal/scheduling/strain_local_resource.h"
#include "pls/internal/base/stack_allocator.h"
#include "pls/internal/helpers/range.h"
#include "pls/internal/helpers/member_function.h"
...
...
@@ -18,6 +19,8 @@ namespace pls {
// 'basic' for-join APIs
using
internal
::
scheduling
::
scheduler
;
using
internal
::
base
::
heap_stack_allocator
;
using
internal
::
base
::
mmap_stack_allocator
;
template
<
typename
Function
>
static
void
spawn
(
Function
&&
function
)
{
scheduler
::
spawn
(
std
::
forward
<
Function
>
(
function
));
...
...
This diff is collapsed.
Click to expand it.
lib/pls/src/internal/scheduling/lock_free/external_trading_deque.cpp
View file @
9fa9296a
...
...
@@ -5,7 +5,7 @@
namespace
pls
::
internal
::
scheduling
::
lock_free
{
traded_cas_field
external_trading_deque
::
peek_traded_object
(
task
*
target_task
)
{
traded_cas_field
current_cas
=
target_task
->
external_trading_deque_cas_
.
load
();
traded_cas_field
current_cas
=
target_task
->
external_trading_deque_cas_
.
load
(
std
::
memory_order_relaxed
);
return
current_cas
;
}
...
...
@@ -17,7 +17,9 @@ task *external_trading_deque::get_trade_object(task *target_task,
traded_cas_field
empty_cas
=
peeked_cas
;
empty_cas
.
make_empty
();
if
(
target_task
->
external_trading_deque_cas_
.
compare_exchange_strong
(
current_cas
,
empty_cas
))
{
if
(
target_task
->
external_trading_deque_cas_
.
compare_exchange_strong
(
current_cas
,
empty_cas
,
std
::
memory_order_acq_rel
))
{
task
*
result
=
task
::
find_task
(
result_id
,
target_task
->
depth_
);
return
result
;
}
...
...
@@ -50,8 +52,8 @@ void external_trading_deque::reset_bot_and_top() {
bot_internal_
.
value_
=
0
;
bot_internal_
.
stamp_
++
;
bot_
.
store
(
0
);
top_
.
store
({
bot_internal_
.
stamp_
,
0
});
bot_
.
store
(
0
,
std
::
memory_order_release
);
top_
.
store
({
bot_internal_
.
stamp_
,
0
}
,
std
::
memory_order_release
);
}
task
*
external_trading_deque
::
pop_bot
()
{
...
...
@@ -83,11 +85,11 @@ task *external_trading_deque::pop_bot() {
}
external_trading_deque
::
peek_result
external_trading_deque
::
peek_top
()
{
auto
local_top
=
top_
.
load
();
auto
local_bot
=
bot_
.
load
();
auto
local_top
=
top_
.
load
(
std
::
memory_order_acquire
);
auto
local_bot
=
bot_
.
load
(
std
::
memory_order_acquire
);
if
(
local_top
.
value_
<
local_bot
)
{
return
peek_result
{
entries_
[
local_top
.
value_
].
traded_task_
,
local_top
};
return
peek_result
{
entries_
[
local_top
.
value_
].
traded_task_
.
load
(
std
::
memory_order_relaxed
)
,
local_top
};
}
else
{
return
peek_result
{
nullptr
,
local_top
};
}
...
...
@@ -95,7 +97,7 @@ external_trading_deque::peek_result external_trading_deque::peek_top() {
task
*
external_trading_deque
::
pop_top
(
task
*
offered_task
,
peek_result
peek_result
)
{
stamped_integer
expected_top
=
peek_result
.
top_pointer_
;
auto
local_bot
=
bot_
.
load
();
auto
local_bot
=
bot_
.
load
(
std
::
memory_order_acquire
);
if
(
expected_top
.
value_
>=
local_bot
)
{
return
nullptr
;
}
...
...
@@ -103,8 +105,8 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul
auto
&
target_entry
=
entries_
[
expected_top
.
value_
];
// Read our potential result
task
*
result
=
target_entry
.
traded_task_
.
load
();
unsigned
long
forwarding_stamp
=
target_entry
.
forwarding_stamp_
.
load
();
task
*
result
=
target_entry
.
traded_task_
.
load
(
std
::
memory_order_relaxed
);
unsigned
long
forwarding_stamp
=
target_entry
.
forwarding_stamp_
.
load
(
std
::
memory_order_relaxed
);
if
(
result
==
nullptr
)
{
return
nullptr
;
...
...
@@ -112,7 +114,7 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul
if
(
forwarding_stamp
!=
expected_top
.
stamp_
)
{
// ...we failed because the top tag lags behind...try to fix it.
// This means only updating the tag, as this location can still hold data we need.
top_
.
compare_exchange_strong
(
expected_top
,
{
forwarding_stamp
,
expected_top
.
value_
});
top_
.
compare_exchange_strong
(
expected_top
,
{
forwarding_stamp
,
expected_top
.
value_
}
,
std
::
memory_order_relaxed
);
return
nullptr
;
}
...
...
@@ -123,16 +125,20 @@ task *external_trading_deque::pop_top(task *offered_task, peek_result peek_resul
traded_cas_field
offered_field
=
expected_sync_cas_field
;
offered_field
.
fill_with_task
(
offered_task
->
thread_id_
);
if
(
result
->
external_trading_deque_cas_
.
compare_exchange_strong
(
expected_sync_cas_field
,
offered_field
))
{
if
(
result
->
external_trading_deque_cas_
.
compare_exchange_strong
(
expected_sync_cas_field
,
offered_field
,
std
::
memory_order_acq_rel
))
{
// We got it, for sure move the top pointer forward.
top_
.
compare_exchange_strong
(
expected_top
,
{
expected_top
.
stamp_
+
1
,
expected_top
.
value_
+
1
});
top_
.
compare_exchange_strong
(
expected_top
,
{
expected_top
.
stamp_
+
1
,
expected_top
.
value_
+
1
},
std
::
memory_order_acq_rel
);
return
result
;
}
else
{
// TODO: Re-Check this condition for forwarding the stamp! Should only happen if another top-stealer took the
// slot that we where interested in!
if
(
expected_sync_cas_field
.
is_filled_with_object
()
&&
expected_sync_cas_field
.
get_stamp
()
==
expected_top
.
stamp_
&&
expected_sync_cas_field
.
get_trade_request_thread_id
()
==
thread_id_
)
{
top_
.
compare_exchange_strong
(
expected_top
,
{
expected_top
.
stamp_
+
1
,
expected_top
.
value_
+
1
});
top_
.
compare_exchange_strong
(
expected_top
,
{
expected_top
.
stamp_
+
1
,
expected_top
.
value_
+
1
},
std
::
memory_order_relaxed
);
}
return
nullptr
;
}
...
...
This diff is collapsed.
Click to expand it.
lib/pls/src/internal/scheduling/lock_free/task.cpp
View file @
9fa9296a
...
...
@@ -15,14 +15,12 @@ void task::prepare_for_push(unsigned int pushing_thread_id) {
}
bool
task
::
push_task_chain
(
task
*
spare_task_chain
,
unsigned
pushing_thread_id
)
{
num_resources_
++
;
PLS_ASSERT
(
this
->
thread_id_
!=
spare_task_chain
->
thread_id_
,
"Makes no sense to push task onto itself, as it is not clean by definition."
);
PLS_ASSERT
(
this
->
depth_
==
spare_task_chain
->
depth_
,
"Must only push tasks with correct depth."
);
data_structures
::
stamped_integer
current_root
;
data_structures
::
stamped_integer
current_root
=
this
->
resource_stack_root_
.
load
(
std
::
memory_order_relaxed
)
;
data_structures
::
stamped_integer
target_root
;
data_structures
::
stamped_integer
expected_next_field
;
...
...
@@ -30,10 +28,8 @@ bool task::push_task_chain(task *spare_task_chain, unsigned pushing_thread_id) {
expected_next_field
.
stamp_
=
pushing_thread_id
+
1
;
expected_next_field
.
value_
=
0
;
int
iteration
=
0
;
do
{
iteration
++
;
current_root
=
this
->
resource_stack_root_
.
load
();
// current_root implicitly re-loaded by CAS in loop
target_root
.
stamp_
=
current_root
.
stamp_
+
1
;
target_root
.
value_
=
spare_task_chain
->
thread_id_
+
1
;
...
...
@@ -50,53 +46,49 @@ bool task::push_task_chain(task *spare_task_chain, unsigned pushing_thread_id) {
target_next_field
.
value_
=
current_root_task
->
thread_id_
+
1
;
}
if
(
!
spare_task_chain
->
resource_stack_next_
.
compare_exchange_strong
(
expected_next_field
,
target_next_field
))
{
num_resources_
--
;
if
(
!
spare_task_chain
->
resource_stack_next_
.
compare_exchange_strong
(
expected_next_field
,
target_next_field
,
std
::
memory_order_relaxed
))
{
return
false
;
}
else
{
expected_next_field
=
target_next_field
;
}
}
while
(
!
this
->
resource_stack_root_
.
compare_exchange_strong
(
current_root
,
target_root
));
}
while
(
!
this
->
resource_stack_root_
.
compare_exchange_strong
(
current_root
,
target_root
,
std
::
memory_order_acq_rel
));
return
true
;
}
void
task
::
reset_task_chain
(
task
*
expected_content
)
{
num_resources_
--
;
data_structures
::
stamped_integer
current_root
=
this
->
resource_stack_root_
.
load
();
data_structures
::
stamped_integer
current_root
=
this
->
resource_stack_root_
.
load
(
std
::
memory_order_relaxed
);
PLS_ASSERT
(
current_root
.
value_
==
expected_content
->
thread_id_
+
1
,
"Must only reset the task chain if we exactly know its state! (current_root.value_)"
);
data_structures
::
stamped_integer
target_root
;
target_root
.
stamp_
=
current_root
.
stamp_
+
1
;
bool
success
=
this
->
resource_stack_root_
.
compare_exchange_strong
(
current_root
,
target_root
);
PLS_ASSERT
(
success
,
"Must always succeed in resetting the chain, as we must be the sole one operating on it!"
);
this
->
resource_stack_root_
.
store
(
target_root
,
std
::
memory_order_relaxed
);
}
task
*
task
::
pop_task_chain
()
{
data_structures
::
stamped_integer
current_root
;
data_structures
::
stamped_integer
current_root
=
this
->
resource_stack_root_
.
load
(
std
::
memory_order_relaxed
)
;
data_structures
::
stamped_integer
target_root
;
task
*
output_task
;
do
{
current_root
=
this
->
resource_stack_root_
.
load
();
// current_root implicitly re-loaded by CAS in loop
if
(
current_root
.
value_
==
0
)
{
// Empty...
return
nullptr
;
}
else
{
// Found something, try to pop it
auto
*
current_root_task
=
find_task
(
current_root
.
value_
-
1
,
this
->
depth_
);
auto
next_stack_cas
=
current_root_task
->
resource_stack_next_
.
load
();
auto
next_stack_cas
=
current_root_task
->
resource_stack_next_
.
load
(
std
::
memory_order_relaxed
);
target_root
.
stamp_
=
current_root
.
stamp_
+
1
;
target_root
.
value_
=
next_stack_cas
.
value_
;
output_task
=
current_root_task
;
}
}
while
(
!
this
->
resource_stack_root_
.
compare_exchange_strong
(
current_root
,
target_root
));
PLS_ASSERT
(
num_resources_
.
fetch_add
(
-
1
)
>
0
,
"Must only return an task from the chain if there are items!"
);
}
while
(
!
this
->
resource_stack_root_
.
compare_exchange_strong
(
current_root
,
target_root
,
std
::
memory_order_acq_rel
));
output_task
->
resource_stack_next_
.
store
({
0
,
0
});
return
output_task
;
...
...
This diff is collapsed.
Click to expand it.
lib/pls/src/internal/scheduling/lock_free/task_manager.cpp
View file @
9fa9296a
...
...
@@ -44,7 +44,7 @@ base_task *task_manager::pop_local_task() {
std
::
tuple
<
base_task
*
,
base_task
*
,
bool
>
task_manager
::
steal_task
(
thread_state
&
stealing_state
)
{
PLS_ASSERT
(
stealing_state
.
get_active_task
()
->
depth_
==
0
,
"Must only steal with clean task chain."
);
PLS_ASSERT
(
scheduler
::
check_task_chain
(
*
stealing_state
.
get_active_task
()),
"Must only steal with clean task chain."
);
PLS_ASSERT
_EXPENSIVE
(
scheduler
::
check_task_chain
(
*
stealing_state
.
get_active_task
()),
"Must only steal with clean task chain."
);
auto
peek
=
deque_
.
peek_top
();
if
(
peek
.
top_task_
)
{
...
...
@@ -83,7 +83,6 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state
return
std
::
tuple
{
stolen_task
,
chain_after_stolen_task
,
true
};
}
else
{
// TODO: traded task resource_stack_next_ field is de-marked from being mine
return
std
::
tuple
{
nullptr
,
nullptr
,
false
};
}
}
else
{
...
...
@@ -94,15 +93,17 @@ std::tuple<base_task *, base_task *, bool> task_manager::steal_task(thread_state
base_task
*
task_manager
::
pop_clean_task_chain
(
base_task
*
base_task
)
{
task
*
target_task
=
static_cast
<
task
*>
(
base_task
);
traded_cas_field
peeked_task_cas_before
,
peeked_task_cas_after
;
peeked_task_cas_after
=
external_trading_deque
::
peek_traded_object
(
target_task
);
while
(
true
)
{
// Try to get a clean resource chain to go back to the main stealing loop
auto
peeked_task_cas_before
=
external_trading_deque
::
peek_traded_object
(
target_task
)
;
peeked_task_cas_before
=
peeked_task_cas_after
;
task
*
pop_result
=
target_task
->
pop_task_chain
();
if
(
pop_result
)
{
PLS_ASSERT
(
scheduler
::
check_task_chain_backward
(
*
pop_result
),
"Must only pop proper task chains."
);
return
pop_result
;
// Got something, so we are simply done here
}
auto
peeked_task_cas_after
=
external_trading_deque
::
peek_traded_object
(
target_task
);
peeked_task_cas_after
=
external_trading_deque
::
peek_traded_object
(
target_task
);
if
(
peeked_task_cas_before
!=
peeked_task_cas_after
)
{
continue
;
...
...
This diff is collapsed.
Click to expand it.
lib/pls/src/internal/scheduling/scheduler.cpp
View file @
9fa9296a
...
...
@@ -5,6 +5,7 @@
#include "pls/internal/scheduling/strain_local_resource.h"
#include "pls/internal/build_flavour.h"
#include "pls/internal/base/error_handling.h"
#include "pls/internal/base/futex_wrapper.h"
#include <thread>
...
...
@@ -57,7 +58,7 @@ void scheduler::work_thread_work_section() {
#if PLS_PROFILING_ENABLED
my_state
.
get_scheduler
().
profiler_
.
stealing_start
(
my_state
.
get_thread_id
());
#endif
PLS_ASSERT
(
check_task_chain
(
*
my_state
.
get_active_task
()),
"Must start stealing with a clean task chain."
);
PLS_ASSERT
_EXPENSIVE
(
check_task_chain
(
*
my_state
.
get_active_task
()),
"Must start stealing with a clean task chain."
);
size_t
target
;
do
{
...
...
@@ -91,7 +92,7 @@ void scheduler::work_thread_work_section() {
auto
*
stolen_resources
=
stolen_task
->
attached_resources_
.
load
(
std
::
memory_order_relaxed
);
strain_local_resource
::
acquire_locally
(
stolen_resources
,
my_state
.
get_thread_id
());
PLS_ASSERT
(
check_task_chain_forward
(
*
my_state
.
get_active_task
()),
PLS_ASSERT
_EXPENSIVE
(
check_task_chain_forward
(
*
my_state
.
get_active_task
()),
"We are sole owner of this chain, it has to be valid!"
);
// Execute the stolen task by jumping to it's continuation.
...
...
@@ -117,12 +118,12 @@ void scheduler::work_thread_work_section() {
my_state
.
get_scheduler
().
profiler_
.
stealing_end
(
my_state
.
get_thread_id
(),
false
);
#endif
#if PLS_SLEEP_WORKERS_ON_EMPTY
switch
(
target_queue_empty_flag
.
value
)
{
switch
(
target_queue_empty_flag
.
value
_
)
{
case
EMPTY_QUEUE_STATE
:
:
QUEUE_NON_EMPTY
:
{
// We found the queue empty, but the flag says it should still be full.
// We want to declare it empty, bet we need to re-check the queue in a sub-step to avoid races.
data_structures
::
stamped_integer
maybe_empty_flag
{
target_queue_empty_flag
.
stamp
+
1
,
EMPTY_QUEUE_STATE
::
QUEUE_MAYBE_EMPTY
};
maybe_empty_flag
{
target_queue_empty_flag
.
stamp
_
+
1
,
EMPTY_QUEUE_STATE
::
QUEUE_MAYBE_EMPTY
};
if
(
target_state
.
get_queue_empty_flag
().
compare_exchange_strong
(
target_queue_empty_flag
,
maybe_empty_flag
))
{
goto
queue_empty_flag_retry_steal
;
...
...
@@ -133,7 +134,7 @@ void scheduler::work_thread_work_section() {
// We found the queue empty and it was already marked as maybe empty.
// We can safely mark it empty and increment the central counter.
data_structures
::
stamped_integer
empty_flag
{
target_queue_empty_flag
.
stamp
+
1
,
EMPTY_QUEUE_STATE
::
QUEUE_EMPTY
};
empty_flag
{
target_queue_empty_flag
.
stamp
_
+
1
,
EMPTY_QUEUE_STATE
::
QUEUE_EMPTY
};
if
(
target_state
.
get_queue_empty_flag
().
compare_exchange_strong
(
target_queue_empty_flag
,
empty_flag
))
{
// We marked it empty, now its our duty to modify the central counter
my_state
.
get_scheduler
().
empty_queue_increase_counter
();
...
...
@@ -216,7 +217,7 @@ context_switcher::continuation scheduler::slow_return(thread_state &calling_stat
"Resources must only reside in the correct depth!"
);
PLS_ASSERT
(
last_task
!=
clean_chain
,
"We want to swap out the last task and its chain to use a clean one, thus they must differ."
);
PLS_ASSERT
(
check_task_chain_backward
(
*
clean_chain
),
PLS_ASSERT
_EXPENSIVE
(
check_task_chain_backward
(
*
clean_chain
),
"Can only acquire clean chains for clean returns!"
);
// Acquire it/merge it with our task chain.
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment