Commit e403e498 by FritzFlorian

Change both stack and queue to same offset counters.

This allows the stack and deque class to use the same offset, making it work better with each other.
parent a9361609
Pipeline #1294 passed with stages
in 4 minutes 7 seconds
...@@ -80,7 +80,7 @@ int main() { ...@@ -80,7 +80,7 @@ int main() {
pls::internal::helpers::run_mini_benchmark([&] { pls::internal::helpers::run_mini_benchmark([&] {
complex_vector input = initial_input; complex_vector input = initial_input;
fft(input.begin(), input.size()); fft(input.begin(), input.size());
}, 7, 30000); }, 7, 1000);
PROFILE_SAVE("test_profile.prof") PROFILE_SAVE("test_profile.prof")
} }
...@@ -21,7 +21,6 @@ void *allocate_aligned(size_t size); ...@@ -21,7 +21,6 @@ void *allocate_aligned(size_t size);
system_details::pointer_t next_alignment(system_details::pointer_t size); system_details::pointer_t next_alignment(system_details::pointer_t size);
system_details::pointer_t previous_alignment(system_details::pointer_t size); system_details::pointer_t previous_alignment(system_details::pointer_t size);
char *next_alignment(char *pointer);
} }
......
...@@ -27,16 +27,10 @@ using base::system_details::pointer_t; ...@@ -27,16 +27,10 @@ using base::system_details::pointer_t;
* stack.pop<T>(); // Remove the top object of type T * stack.pop<T>(); // Remove the top object of type T
*/ */
class aligned_stack { class aligned_stack {
// Keep bounds of our memory block
pointer_t memory_start_;
pointer_t memory_end_;
// Current head will always be aligned to cache lines
pointer_t head_;
public: public:
typedef pointer_t state; typedef size_t stack_offset;
aligned_stack() : memory_start_{0}, memory_end_{0}, head_{0} {}; aligned_stack() : aligned_memory_start_{0}, aligned_memory_end_{0}, max_offset_{0}, current_offset_{0} {};
aligned_stack(pointer_t memory_region, std::size_t size); aligned_stack(pointer_t memory_region, std::size_t size);
aligned_stack(char *memory_region, std::size_t size); aligned_stack(char *memory_region, std::size_t size);
...@@ -48,8 +42,18 @@ class aligned_stack { ...@@ -48,8 +42,18 @@ class aligned_stack {
template<typename T> template<typename T>
T pop(); T pop();
state save_state() const { return head_; } void *memory_at_offset(stack_offset offset) const;
void reset_state(state new_state) { head_ = new_state; }
stack_offset save_offset() const { return current_offset_; }
void reset_offset(stack_offset new_offset) { current_offset_ = new_offset; }
private:
// Keep bounds of our memory block
pointer_t aligned_memory_start_;
pointer_t aligned_memory_end_;
stack_offset max_offset_;
stack_offset current_offset_;
}; };
} }
......
...@@ -21,8 +21,10 @@ void *aligned_stack::push_bytes() { ...@@ -21,8 +21,10 @@ void *aligned_stack::push_bytes() {
template<typename T> template<typename T>
T aligned_stack::pop() { T aligned_stack::pop() {
head_ = head_ - base::alignment::next_alignment(sizeof(T)); auto num_cache_lines = base::alignment::next_alignment(sizeof(T)) / base::system_details::CACHE_LINE_SIZE;
return *reinterpret_cast<T *>(head_); current_offset_ -= num_cache_lines;
return *reinterpret_cast<T *>(memory_at_offset(current_offset_));
} }
} }
......
...@@ -48,7 +48,7 @@ class locking_deque { ...@@ -48,7 +48,7 @@ class locking_deque {
base::spin_lock lock_; base::spin_lock lock_;
public: public:
using state = aligned_stack::state; using state = aligned_stack::stack_offset;
explicit locking_deque(aligned_stack *stack) explicit locking_deque(aligned_stack *stack)
: stack_{stack}, head_{nullptr}, tail_{nullptr}, lock_{} {} : stack_{stack}, head_{nullptr}, tail_{nullptr}, lock_{} {}
......
...@@ -75,11 +75,11 @@ Item *locking_deque<Item>::pop_head() { ...@@ -75,11 +75,11 @@ Item *locking_deque<Item>::pop_head() {
template<typename Item> template<typename Item>
void locking_deque<Item>::release_memory_until(state state) { void locking_deque<Item>::release_memory_until(state state) {
stack_->reset_state(state); stack_->reset_offset(state);
} }
template<typename Item> template<typename Item>
typename locking_deque<Item>::state locking_deque<Item>::save_state() { typename locking_deque<Item>::state locking_deque<Item>::save_state() {
return stack_->save_state(); return stack_->save_offset();
} }
} }
......
...@@ -17,7 +17,7 @@ using base::system_details::pointer_t; ...@@ -17,7 +17,7 @@ using base::system_details::pointer_t;
// Integer split into two halfs, can be used in CAS operations // Integer split into two halfs, can be used in CAS operations
using data_structures::stamped_integer; using data_structures::stamped_integer;
using offset_t = stamped_integer::member_t; using deque_offset = stamped_integer::member_t;
// Single Item in the deque // Single Item in the deque
class work_stealing_deque_item { class work_stealing_deque_item {
...@@ -29,8 +29,8 @@ class work_stealing_deque_item { ...@@ -29,8 +29,8 @@ class work_stealing_deque_item {
// Pointer to the actual data // Pointer to the actual data
std::atomic<pointer_t> data_; std::atomic<pointer_t> data_;
// Index (relative to stack base) to the next and previous element // Index (relative to stack base) to the next and previous element
std::atomic<offset_t> next_item_; std::atomic<deque_offset> next_item_;
offset_t previous_item_; deque_offset previous_item_;
public: public:
work_stealing_deque_item() : data_{0}, next_item_{}, previous_item_{} {} work_stealing_deque_item() : data_{0}, next_item_{}, previous_item_{} {}
...@@ -45,54 +45,48 @@ class work_stealing_deque_item { ...@@ -45,54 +45,48 @@ class work_stealing_deque_item {
data_ = reinterpret_cast<pointer_t >(data); data_ = reinterpret_cast<pointer_t >(data);
} }
offset_t next_item() const { return next_item_.load(); } deque_offset next_item() const { return next_item_.load(); }
void set_next_item(offset_t next_item) { next_item_ = next_item; } void set_next_item(deque_offset next_item) { next_item_ = next_item; }
offset_t previous_item() const { return previous_item_; } deque_offset previous_item() const { return previous_item_; }
void set_previous_item(offset_t previous_item) { previous_item_ = previous_item; } void set_previous_item(deque_offset previous_item) { previous_item_ = previous_item; }
}; };
static_assert(sizeof(work_stealing_deque_item) < base::system_details::CACHE_LINE_SIZE,
"Work stealing deque relies on memory layout and requires cache lines to be longer than one 'work_stealing_deque_item' instance!");
template<typename Item> template<typename Task>
class work_stealing_deque { class work_stealing_deque {
// Deque 'takes over' stack and handles memory management while in use. // Deque 'takes over' stack and handles memory management while in use.
// At any point in time the deque can stop using more memory and the stack can be used by other entities. // At any point in time the deque can stop using more memory and the stack can be used by other entities.
aligned_stack *stack_; aligned_stack *stack_;
pointer_t base_pointer_;
std::atomic<stamped_integer> head_; std::atomic<stamped_integer> head_;
std::atomic<offset_t> tail_; std::atomic<deque_offset> tail_;
offset_t previous_tail_; deque_offset previous_tail_;
public: Task* last_pushed_task_;
using state = aligned_stack::state;
public:
explicit work_stealing_deque(aligned_stack *stack) : stack_{stack}, explicit work_stealing_deque(aligned_stack *stack) : stack_{stack},
base_pointer_{0},
head_{stamped_integer{0, 0}}, head_{stamped_integer{0, 0}},
tail_{0}, tail_{0},
previous_tail_{0} { previous_tail_{0},
reset_base_pointer(); last_pushed_task_{0} {}
}
template<typename T, typename ...ARGS> template<typename T, typename ...ARGS>
T *push_tail(ARGS &&... args); T *push_task(ARGS &&... args);
template<typename T, typename Function, typename ...ARGS> template<typename T, typename ...ARGS>
T *push_tail_cb(const Function &after_creation, ARGS &&... args); T *push_object(ARGS &&... args);
Item *pop_tail(); void *push_bytes(size_t size);
Item *pop_head(); void publish_last_task();
void release_memory_until(state state); Task *pop_local_task();
state save_state(); Task *pop_external_task();
private: void reset_offset(deque_offset offset);
void reset_base_pointer(); deque_offset save_offset();
work_stealing_deque_item *item_at(offset_t offset);
offset_t current_stack_offset();
template<typename T, typename ...ARGS> private:
std::pair<work_stealing_deque_item, T> *allocate_item(ARGS &&... args); work_stealing_deque_item *item_at(deque_offset offset);
deque_offset current_stack_offset();
}; };
} }
......
...@@ -9,26 +9,23 @@ namespace pls { ...@@ -9,26 +9,23 @@ namespace pls {
namespace internal { namespace internal {
namespace data_structures { namespace data_structures {
template<typename Item> template<typename Task>
void work_stealing_deque<Item>::reset_base_pointer() { work_stealing_deque_item *work_stealing_deque<Task>::item_at(deque_offset offset) {
base_pointer_ = reinterpret_cast<pointer_t >(stack_->save_state()); // Keep the base of our region in the stack return reinterpret_cast<work_stealing_deque_item *>(stack_->memory_at_offset(offset));
} }
template<typename Item> template<typename Task>
work_stealing_deque_item *work_stealing_deque<Item>::item_at(offset_t offset) { deque_offset work_stealing_deque<Task>::current_stack_offset() {
return reinterpret_cast<work_stealing_deque_item *>(base_pointer_ return stack_->save_offset();
+ (base::system_details::CACHE_LINE_SIZE * offset));
} }
template<typename Item> template<typename Task>
offset_t work_stealing_deque<Item>::current_stack_offset() {
return (stack_->save_state() - base_pointer_) / base::system_details::CACHE_LINE_SIZE;
}
template<typename Item>
template<typename T, typename ...ARGS> template<typename T, typename ...ARGS>
std::pair<work_stealing_deque_item, T> *work_stealing_deque<Item>::allocate_item(ARGS &&... args) { T *work_stealing_deque<Task>::push_task(ARGS &&... args) {
// 'Union' type to push both on stack static_assert(std::is_same<Task, T>::value || std::is_base_of<Task, T>::value,
"Must only push types of <Task> onto work_stealing_deque<Task>");
// 'Union' type to push both the task and the deque entry as one part onto the stack
using pair_t = std::pair<work_stealing_deque_item, T>; using pair_t = std::pair<work_stealing_deque_item, T>;
// Allocate space on stack // Allocate space on stack
auto new_pair = reinterpret_cast<pair_t *>(stack_->push_bytes<pair_t>()); auto new_pair = reinterpret_cast<pair_t *>(stack_->push_bytes<pair_t>());
...@@ -36,43 +33,45 @@ std::pair<work_stealing_deque_item, T> *work_stealing_deque<Item>::allocate_item ...@@ -36,43 +33,45 @@ std::pair<work_stealing_deque_item, T> *work_stealing_deque<Item>::allocate_item
new((void *) &(new_pair->first)) work_stealing_deque_item(); new((void *) &(new_pair->first)) work_stealing_deque_item();
new((void *) &(new_pair->second)) T(std::forward<ARGS>(args)...); new((void *) &(new_pair->second)) T(std::forward<ARGS>(args)...);
return new_pair; // Keep reference for later publishing
last_pushed_task_ = &new_pair->second;
// Item is not publicly visible until it is published
return &(new_pair->second);
} }
template<typename Item> template<typename Task>
template<typename T, typename ...ARGS> template<typename T, typename ...ARGS>
T *work_stealing_deque<Item>::push_tail(ARGS &&... args) { T *work_stealing_deque<Task>::push_object(ARGS &&... args) {
return push_tail_cb<T>([](T *) {}, std::forward<ARGS>(args)...); // Simply add data to the stack, do not publish it in any way
return stack_->push<T>(std::forward(args)...);
} }
template<typename Item> template<typename Task>
template<typename T, typename Function, typename ...ARGS> void *work_stealing_deque<Task>::push_bytes(size_t size) {
T *work_stealing_deque<Item>::push_tail_cb(const Function &after_creation, ARGS &&... args) { // Simply add data to the stack, do not publish it in any way
static_assert(std::is_same<Item, T>::value || std::is_base_of<Item, T>::value, return stack_->push_bytes(size);
"Must only push types of <Item> onto work_stealing_deque<Item>"); }
offset_t local_tail = tail_;
auto new_pair = allocate_item<T>(std::forward<ARGS>(args)...); template<typename Task>
after_creation(&(new_pair->second)); // callback for time after creation but before being visible to others void work_stealing_deque<Task>::publish_last_task() {
deque_offset local_tail = tail_;
// Prepare current tail to point to correct next items // Prepare current tail to point to correct next task
auto tail_deque_item = item_at(local_tail); auto tail_deque_item = item_at(local_tail);
tail_deque_item->set_data(&(new_pair->second)); tail_deque_item->set_data(last_pushed_task_);
tail_deque_item->set_next_item(current_stack_offset()); tail_deque_item->set_next_item(current_stack_offset());
tail_deque_item->set_previous_item(previous_tail_); tail_deque_item->set_previous_item(previous_tail_);
previous_tail_ = local_tail; previous_tail_ = local_tail;
// Linearization point, item appears after this write // Linearization point, task appears after this write
offset_t new_tail = current_stack_offset(); deque_offset new_tail = current_stack_offset();
tail_ = new_tail; tail_ = new_tail;
return &(new_pair->second);
} }
template<typename Item> template<typename Task>
Item *work_stealing_deque<Item>::pop_tail() { Task *work_stealing_deque<Task>::pop_local_task() {
offset_t local_tail = tail_; deque_offset local_tail = tail_;
stamped_integer local_head = head_; stamped_integer local_head = head_;
if (local_tail <= local_head.value) { if (local_tail <= local_head.value) {
...@@ -80,7 +79,7 @@ Item *work_stealing_deque<Item>::pop_tail() { ...@@ -80,7 +79,7 @@ Item *work_stealing_deque<Item>::pop_tail() {
} }
work_stealing_deque_item *previous_tail_item = item_at(previous_tail_); work_stealing_deque_item *previous_tail_item = item_at(previous_tail_);
offset_t new_tail = previous_tail_; deque_offset new_tail = previous_tail_;
previous_tail_ = previous_tail_item->previous_item(); previous_tail_ = previous_tail_item->previous_item();
// Publish our wish to set the tail back // Publish our wish to set the tail back
...@@ -89,14 +88,14 @@ Item *work_stealing_deque<Item>::pop_tail() { ...@@ -89,14 +88,14 @@ Item *work_stealing_deque<Item>::pop_tail() {
local_head = head_; // Linearization point, outside knows list is empty local_head = head_; // Linearization point, outside knows list is empty
if (local_head.value < new_tail) { if (local_head.value < new_tail) {
return previous_tail_item->data<Item>(); // Success, enough distance to other threads return previous_tail_item->data<Task>(); // Success, enough distance to other threads
} }
if (local_head.value == new_tail) { if (local_head.value == new_tail) {
stamped_integer new_head = stamped_integer{local_head.stamp + 1, new_tail}; stamped_integer new_head = stamped_integer{local_head.stamp + 1, new_tail};
// Try competing with consumers by updating the head's stamp value // Try competing with consumers by updating the head's stamp value
if (head_.compare_exchange_strong(local_head, new_head)) { if (head_.compare_exchange_strong(local_head, new_head)) {
return previous_tail_item->data<Item>(); // SUCCESS, we won the competition with other threads return previous_tail_item->data<Task>(); // SUCCESS, we won the competition with other threads
} }
} }
...@@ -108,10 +107,10 @@ Item *work_stealing_deque<Item>::pop_tail() { ...@@ -108,10 +107,10 @@ Item *work_stealing_deque<Item>::pop_tail() {
return nullptr; // EMPTY, we lost the competition with other threads return nullptr; // EMPTY, we lost the competition with other threads
} }
template<typename Item> template<typename Task>
Item *work_stealing_deque<Item>::pop_head() { Task *work_stealing_deque<Task>::pop_external_task() {
stamped_integer local_head = head_; stamped_integer local_head = head_;
offset_t local_tail = tail_; deque_offset local_tail = tail_;
if (local_tail <= local_head.value) { if (local_tail <= local_head.value) {
return nullptr; // EMPTY return nullptr; // EMPTY
...@@ -121,8 +120,8 @@ Item *work_stealing_deque<Item>::pop_head() { ...@@ -121,8 +120,8 @@ Item *work_stealing_deque<Item>::pop_head() {
// there has to be a competition over the tail -> the stamp increased and our next // there has to be a competition over the tail -> the stamp increased and our next
// operation will fail anyways! // operation will fail anyways!
work_stealing_deque_item *head_deque_item = item_at(local_head.value); work_stealing_deque_item *head_deque_item = item_at(local_head.value);
offset_t next_item_offset = head_deque_item->next_item(); deque_offset next_item_offset = head_deque_item->next_item();
Item *head_data_item = head_deque_item->data<Item>(); Task *head_data_item = head_deque_item->data<Task>();
// We try to set the head to this new position. // We try to set the head to this new position.
// Possible outcomes: // Possible outcomes:
...@@ -137,26 +136,23 @@ Item *work_stealing_deque<Item>::pop_head() { ...@@ -137,26 +136,23 @@ Item *work_stealing_deque<Item>::pop_head() {
return nullptr; // EMPTY, we lost the competition return nullptr; // EMPTY, we lost the competition
} }
template<typename Item> template<typename Task>
void work_stealing_deque<Item>::release_memory_until(state state) { void work_stealing_deque<Task>::reset_offset(deque_offset offset) {
unsigned long item_offset = (state - base_pointer_) / base::system_details::CACHE_LINE_SIZE; stack_->reset_offset(offset);
stamped_integer local_head = head_; stamped_integer local_head = head_;
offset_t local_tail = tail_; deque_offset local_tail = tail_;
if (offset < local_tail) {
stack_->reset_state(state); tail_ = offset;
if (item_offset < local_tail) {
tail_ = item_offset;
if (local_head.value >= local_tail) { if (local_head.value >= local_tail) {
head_ = stamped_integer{local_head.stamp + 1, item_offset}; head_ = stamped_integer{local_head.stamp + 1, offset};
} }
} }
} }
template<typename Item> template<typename Task>
typename work_stealing_deque<Item>::state work_stealing_deque<Item>::save_state() { deque_offset work_stealing_deque<Task>::save_offset() {
return stack_->save_state(); return current_stack_offset();
} }
} }
......
...@@ -21,7 +21,7 @@ void run_mini_benchmark(const Function &lambda, ...@@ -21,7 +21,7 @@ void run_mini_benchmark(const Function &lambda,
using namespace std; using namespace std;
using namespace pls::internal::scheduling; using namespace pls::internal::scheduling;
malloc_scheduler_memory scheduler_memory{max_threads, 2u << 14}; malloc_scheduler_memory scheduler_memory{max_threads, 2u << 17u};
for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) { for (unsigned int num_threads = 1; num_threads <= max_threads; num_threads++) {
scheduler local_scheduler{&scheduler_memory, num_threads}; scheduler local_scheduler{&scheduler_memory, num_threads};
......
...@@ -28,7 +28,7 @@ void scheduler::perform_work(Function work_section) { ...@@ -28,7 +28,7 @@ void scheduler::perform_work(Function work_section) {
// Do work (see if we can remove this duplicated code) // Do work (see if we can remove this duplicated code)
root_task.parent_ = nullptr; root_task.parent_ = nullptr;
root_task.deque_state_ = my_state->deque_.save_state(); root_task.deque_offset_ = my_state->deque_.save_offset();
root_task.execute(); root_task.execute();
work_section_done_ = true; work_section_done_ = true;
......
...@@ -24,7 +24,7 @@ class task { ...@@ -24,7 +24,7 @@ class task {
task *parent_; task *parent_;
// Stack Management (reset stack pointer after wait_for_all() calls) // Stack Management (reset stack pointer after wait_for_all() calls)
data_structures::deque<task>::state deque_state_; data_structures::deque_offset deque_offset_;
protected: protected:
/* /*
...@@ -66,20 +66,36 @@ void task::spawn_child(ARGS &&... args) { ...@@ -66,20 +66,36 @@ void task::spawn_child(ARGS &&... args) {
ref_count_++; ref_count_++;
// Push on our deque // Push on our deque
thread_state::get()->deque_.push_tail_cb<T>([this](T *item) { auto item = thread_state::get()->deque_.push_task<T>(std::forward<ARGS>(args)...);
// Assign forced values (for stack and parent management)
item->parent_ = this; // Assign forced values (for stack and parent management)
item->finished_construction_ = true; item->parent_ = this;
item->deque_state_ = thread_state::get()->deque_.save_state(); item->finished_construction_ = true;
}, std::forward<ARGS>(args)...); item->deque_offset_ = thread_state::get()->deque_.save_offset();
// Make new task visible to others
thread_state::get()->deque_.publish_last_task();
} }
template<typename T, typename ...ARGS> template<typename T, typename ...ARGS>
void task::spawn_child_and_wait(ARGS &&... args) { void task::spawn_child_and_wait(ARGS &&... args) {
static_assert(std::is_base_of<task, typename std::remove_reference<T>::type>::value, "Only pass task subclasses!"); static_assert(std::is_base_of<task, typename std::remove_reference<T>::type>::value, "Only pass task subclasses!");
// TODO: See if we can inline this (avoid counters/deque) while maintaining memory management
spawn_child<T>(std::forward<ARGS>(args)...); spawn_child<T>(std::forward<ARGS>(args)...);
// TODO: Check why 'direct spawn' (even when pushing it onto the tas queue) seems to be slower
// (Also check if it even is slower or if it only appears so on our laptop)
// // Push on our deque
// auto task = thread_state::get()->deque_.push_task<T>(std::forward<ARGS>(args)...);
//
// // Assign forced values (for stack and parent management)
// task->parent_ = nullptr; // ...do not assign this to a parent => it will not notify our reference counter
// task->finished_construction_ = true;
// task->deque_offset_ = thread_state::get()->deque_.save_offset();
//
// // Execute it
// task->execute();
// Wait for the rest of the tasks
wait_for_all(); wait_for_all();
} }
......
...@@ -6,21 +6,28 @@ namespace internal { ...@@ -6,21 +6,28 @@ namespace internal {
namespace data_structures { namespace data_structures {
aligned_stack::aligned_stack(pointer_t memory_region, const std::size_t size) : aligned_stack::aligned_stack(pointer_t memory_region, const std::size_t size) :
memory_start_{memory_region}, aligned_memory_start_{base::alignment::next_alignment(memory_region)},
memory_end_{memory_region + size}, aligned_memory_end_{base::alignment::previous_alignment(memory_region + size)},
head_{base::alignment::next_alignment(memory_start_)} {} max_offset_{(aligned_memory_end_ - aligned_memory_start_) / base::system_details::CACHE_LINE_SIZE},
current_offset_{0} {}
aligned_stack::aligned_stack(char *memory_region, const std::size_t size) : aligned_stack::aligned_stack(char *memory_region, const std::size_t size) :
memory_start_{(pointer_t) memory_region}, aligned_stack((pointer_t) memory_region, size) {}
memory_end_{(pointer_t) memory_region + size},
head_{base::alignment::next_alignment(memory_start_)} {} void *aligned_stack::memory_at_offset(stack_offset offset) const {
const auto byte_offset = offset * base::system_details::CACHE_LINE_SIZE;
return reinterpret_cast<void *>(aligned_memory_start_ + byte_offset);
}
void *aligned_stack::push_bytes(size_t size) { void *aligned_stack::push_bytes(size_t size) {
void *result = reinterpret_cast<void *>(head_); size_t round_up_size = base::alignment::next_alignment(size);
size_t num_cache_lines = round_up_size / base::system_details::CACHE_LINE_SIZE;
void *result = memory_at_offset(current_offset_);
// Move head to next aligned position after new object // Move head to next aligned position after new object
head_ = base::alignment::next_alignment(head_ + size); current_offset_ += num_cache_lines;
if (head_ >= memory_end_) { if (current_offset_ > max_offset_) {
PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!"); PLS_ERROR("Tried to allocate object on alligned_stack without sufficient memory!");
} }
......
...@@ -53,7 +53,7 @@ void scheduler::worker_routine() { ...@@ -53,7 +53,7 @@ void scheduler::worker_routine() {
// Main Thread // Main Thread
auto root_task = scheduler->main_thread_root_task_; auto root_task = scheduler->main_thread_root_task_;
root_task->parent_ = nullptr; root_task->parent_ = nullptr;
root_task->deque_state_ = my_state->deque_.save_state(); root_task->deque_offset_ = my_state->deque_.save_offset();
root_task->execute(); root_task->execute();
scheduler->work_section_done_ = true; scheduler->work_section_done_ = true;
...@@ -90,7 +90,7 @@ void scheduler::terminate() { ...@@ -90,7 +90,7 @@ void scheduler::terminate() {
task *scheduler::get_local_task() { task *scheduler::get_local_task() {
PROFILE_STEALING("Get Local Task") PROFILE_STEALING("Get Local Task")
return thread_state::get()->deque_.pop_tail(); return thread_state::get()->deque_.pop_local_task();
} }
task *scheduler::steal_task() { task *scheduler::steal_task() {
...@@ -112,7 +112,7 @@ task *scheduler::steal_task() { ...@@ -112,7 +112,7 @@ task *scheduler::steal_task() {
auto target_state = thread_state_for(target); auto target_state = thread_state_for(target);
// TODO: See if we should re-try popping if it failed due to contention // TODO: See if we should re-try popping if it failed due to contention
auto result = target_state->deque_.pop_head(); auto result = target_state->deque_.pop_external_task();
if (result != nullptr) { if (result != nullptr) {
return result; return result;
} }
...@@ -137,7 +137,7 @@ bool scheduler::try_execute_local() { ...@@ -137,7 +137,7 @@ bool scheduler::try_execute_local() {
bool scheduler::try_execute_stolen() { bool scheduler::try_execute_stolen() {
task *stolen_task = steal_task(); task *stolen_task = steal_task();
if (stolen_task != nullptr) { if (stolen_task != nullptr) {
stolen_task->deque_state_ = thread_state::get()->deque_.save_state(); stolen_task->deque_offset_ = thread_state::get()->deque_.save_offset();
stolen_task->execute(); stolen_task->execute();
return true; return true;
} }
......
...@@ -12,13 +12,13 @@ task::task() : ...@@ -12,13 +12,13 @@ task::task() :
finished_construction_{false}, finished_construction_{false},
ref_count_{0}, ref_count_{0},
parent_{nullptr}, parent_{nullptr},
deque_state_{0} {} deque_offset_{0} {}
void *task::allocate_memory(long size) { void *task::allocate_memory(long size) {
if (finished_construction_) { if (finished_construction_) {
PLS_ERROR("Must not allocate dynamic task memory after it's construction.") PLS_ERROR("Must not allocate dynamic task memory after it's construction.")
} }
return thread_state::get()->task_stack_->push_bytes(size); return thread_state::get()->deque_.push_bytes(size);
} }
void task::execute() { void task::execute() {
...@@ -45,7 +45,7 @@ void task::wait_for_all() { ...@@ -45,7 +45,7 @@ void task::wait_for_all() {
scheduler->try_execute_stolen(); scheduler->try_execute_stolen();
} }
} }
thread_state::get()->deque_.release_memory_until(deque_state_); thread_state::get()->deque_.reset_offset(deque_offset_);
} }
} }
......
...@@ -85,77 +85,98 @@ TEST_CASE("work_stealing_deque functions correctly", "[internal/data_structures/ ...@@ -85,77 +85,98 @@ TEST_CASE("work_stealing_deque functions correctly", "[internal/data_structures/
int one = 1, two = 2, three = 3, four = 4; int one = 1, two = 2, three = 3, four = 4;
SECTION("add and remove items form the tail") { SECTION("add and remove items form the tail") {
deque.push_tail<int>(one); deque.push_task<int>(one);
deque.push_tail<int>(two); deque.publish_last_task();
deque.push_tail<int>(three); deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_tail() == three); deque.push_task<int>(three);
REQUIRE(*deque.pop_tail() == two); deque.publish_last_task();
REQUIRE(*deque.pop_tail() == one);
REQUIRE(*deque.pop_local_task() == three);
REQUIRE(*deque.pop_local_task() == two);
REQUIRE(*deque.pop_local_task() == one);
} }
SECTION("handles getting empty by popping the tail correctly") { SECTION("handles getting empty by popping the tail correctly") {
deque.push_tail<int>(one); deque.push_task<int>(one);
REQUIRE(*deque.pop_tail() == one); deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == one);
deque.push_tail<int>(two); deque.push_task<int>(two);
REQUIRE(*deque.pop_tail() == two); deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == two);
} }
SECTION("remove items form the head") { SECTION("remove items form the head") {
deque.push_tail<int>(one); deque.push_task<int>(one);
deque.push_tail<int>(two); deque.publish_last_task();
deque.push_tail<int>(three); deque.push_task<int>(two);
deque.publish_last_task();
REQUIRE(*deque.pop_head() == one); deque.push_task<int>(three);
REQUIRE(*deque.pop_head() == two); deque.publish_last_task();
REQUIRE(*deque.pop_head() == three);
REQUIRE(*deque.pop_external_task() == one);
REQUIRE(*deque.pop_external_task() == two);
REQUIRE(*deque.pop_external_task() == three);
} }
SECTION("handles getting empty by popping the head correctly") { SECTION("handles getting empty by popping the head correctly") {
deque.push_tail<int>(one); deque.push_task<int>(one);
REQUIRE(*deque.pop_head() == one); deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == one);
deque.push_tail<int>(two); deque.push_task<int>(two);
REQUIRE(*deque.pop_head() == two); deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == two);
} }
SECTION("handles getting empty by popping the head and tail correctly") { SECTION("handles getting empty by popping the head and tail correctly") {
deque.push_tail<int>(one); deque.push_task<int>(one);
REQUIRE(*deque.pop_tail() == one); deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == one);
deque.push_tail<int>(two); deque.push_task<int>(two);
REQUIRE(*deque.pop_head() == two); deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == two);
deque.push_tail<int>(three); deque.push_task<int>(three);
REQUIRE(*deque.pop_tail() == three); deque.publish_last_task();
REQUIRE(*deque.pop_local_task() == three);
} }
SECTION("handles jumps bigger 1 correctly") { SECTION("handles jumps bigger 1 correctly") {
deque.push_tail<int>(one); deque.push_task<int>(one);
deque.push_tail<int>(two); deque.publish_last_task();
REQUIRE(*deque.pop_tail() == two); deque.push_task<int>(two);
deque.publish_last_task();
deque.push_tail<int>(three); REQUIRE(*deque.pop_local_task() == two);
deque.push_tail<int>(four);
REQUIRE(*deque.pop_head() == one); deque.push_task<int>(three);
REQUIRE(*deque.pop_head() == three); deque.publish_last_task();
REQUIRE(*deque.pop_head() == four); deque.push_task<int>(four);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == one);
REQUIRE(*deque.pop_external_task() == three);
REQUIRE(*deque.pop_external_task() == four);
} }
SECTION("handles stack reset 1 correctly when emptied by tail") { SECTION("handles stack reset 1 correctly when emptied by tail") {
deque.push_tail<int>(one); deque.push_task<int>(one);
auto state = deque.save_state(); deque.publish_last_task();
deque.push_tail<int>(two); auto state = deque.save_offset();
REQUIRE(*deque.pop_tail() == two); deque.push_task<int>(two);
deque.publish_last_task();
deque.release_memory_until(state); REQUIRE(*deque.pop_local_task() == two);
REQUIRE(*deque.pop_tail() == one);
deque.reset_offset(state);
deque.push_tail<int>(three); REQUIRE(*deque.pop_local_task() == one);
deque.push_tail<int>(four);
REQUIRE(*deque.pop_head() == three); deque.push_task<int>(three);
REQUIRE(*deque.pop_tail() == four); deque.publish_last_task();
deque.push_task<int>(four);
deque.publish_last_task();
REQUIRE(*deque.pop_external_task() == three);
REQUIRE(*deque.pop_local_task() == four);
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment