Commit 0141a57a by FritzFlorian

Skecht 'externaly trading task dequeue'.

The deque trades tasks when stealing. Right now only the fast local path is tested and implemented. For the next step to work we also need to add the resource stack and resource tarding to the system.
parent 625836aa
Pipeline #1386 failed with stages
in 24 seconds
...@@ -30,7 +30,7 @@ int pls_fib(int n) { ...@@ -30,7 +30,7 @@ int pls_fib(int n) {
constexpr int MAX_NUM_THREADS = 1; constexpr int MAX_NUM_THREADS = 1;
constexpr int MAX_NUM_TASKS = 64; constexpr int MAX_NUM_TASKS = 64;
constexpr int MAX_STACK_SIZE = 256; constexpr int MAX_STACK_SIZE = 1024;
int main(int argc, char **argv) { int main(int argc, char **argv) {
int num_threads; int num_threads;
......
...@@ -49,21 +49,19 @@ add_library(pls STATIC ...@@ -49,21 +49,19 @@ add_library(pls STATIC
include/pls/internal/helpers/seqence.h include/pls/internal/helpers/seqence.h
include/pls/internal/helpers/member_function.h include/pls/internal/helpers/member_function.h
include/pls/internal/scheduling/parallel_result.h
include/pls/internal/scheduling/thread_state.h include/pls/internal/scheduling/thread_state.h
include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp include/pls/internal/scheduling/scheduler.h src/internal/scheduling/scheduler.cpp
include/pls/internal/scheduling/scheduler_impl.h include/pls/internal/scheduling/scheduler_impl.h
include/pls/internal/scheduling/scheduler_memory.h include/pls/internal/scheduling/scheduler_memory.h
include/pls/internal/scheduling/task_manager.h include/pls/internal/scheduling/task_manager.h
include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp include/pls/internal/scheduling/task.h src/internal/scheduling/task.cpp
include/pls/internal/scheduling/cont_manager.h
include/pls/internal/scheduling/cont.h
include/pls/internal/data_structures/bounded_ws_deque.h include/pls/internal/data_structures/bounded_ws_deque.h
include/pls/internal/data_structures/optional.h include/pls/internal/data_structures/optional.h
include/pls/internal/scheduling/memory_block.h
include/pls/internal/scheduling/thread_state_static.h include/pls/internal/scheduling/thread_state_static.h
src/internal/base/error_handling.cpp src/internal/base/error_handling.cpp
include/pls/internal/data_structures/bounded_trading_deque.h) include/pls/internal/data_structures/bounded_trading_deque.h
include/pls/internal/scheduling/external_trading_deque.h
include/pls/internal/scheduling/traded_cas_field.h)
# Add everything in `./include` to be in the include path of this project # Add everything in `./include` to be in the include path of this project
target_include_directories(pls target_include_directories(pls
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
void pls_error(const char *msg); void pls_error(const char *msg);
// TODO: Distinguish between debug/internal asserts and production asserts. // TODO: Distinguish between debug/internal asserts and production asserts.
// TODO: Re-Enable Asserts #define PLS_ASSERT(cond, msg) if (!(cond)) { pls_error(msg); }
#define PLS_ASSERT(cond, msg) //if (!(cond)) { pls_error(msg); }
#endif //PLS_ERROR_HANDLING_H #endif //PLS_ERROR_HANDLING_H
...@@ -39,7 +39,7 @@ using pointer_t = std::uintptr_t; ...@@ -39,7 +39,7 @@ using pointer_t = std::uintptr_t;
* Usually it is sane to assume a pointer can be swapped in a single CAS operation. * Usually it is sane to assume a pointer can be swapped in a single CAS operation.
*/ */
using cas_integer = std::uintptr_t; using cas_integer = std::uintptr_t;
constexpr unsigned long CAS_SIZE = sizeof(cas_integer); constexpr unsigned long CAS_SIZE = sizeof(cas_integer) * 8;
/** /**
* Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings). * Most processors have 64 byte cache lines (last 6 bit of the address are zero at line beginnings).
......
#ifndef PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_
#define PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_
#include <atomic>
#include <tuple>
#include "pls/internal/base/error_handling.h"
#include "pls/internal/base/system_details.h"
#include "pls/internal/data_structures/optional.h"
#include "pls/internal/data_structures/stamped_integer.h"
#include "pls/internal/scheduling/traded_cas_field.h"
#include "pls/internal/scheduling/task.h"
namespace pls {
namespace internal {
namespace scheduling {
struct trading_deque_entry {
std::atomic<task *> traded_task_{nullptr};
std::atomic<unsigned long> forwarding_stamp_{};
};
/**
* A work stealing deque (single produces/consumer at the end, multiple consumers at the start).
* A task object can only be acquired by stealing consumers (from the start),
* when they also offer a task to trade in for it.
*
* The exchange of 'goods' (here tasks) happens atomically at a linearization point.
* This means that the owning thread always gets a tasks for each and every task that was
* successfully stolen.
*
* As each task is associated with memory this suffices to exchange memory blocks needed for execution.
*/
class external_trading_deque {
public:
external_trading_deque(trading_deque_entry *entries, size_t num_entries) :
entries_{entries}, num_entries_{num_entries} {};
/**
* Pushes a task on the bottom of the deque.
* The task itself wil be filled with the unique, synchronizing cas word.
*
* @param published_task The task to publish on the bottom of the deque.
* @return The content of the cas word, can be later used to check if it changed.
*/
traded_cas_field push_bot(task *published_task) {
auto expected_stamp = bot_internal_.stamp;
auto &current_entry = entries_[bot_internal_.value];
// Store stealing information in the task and deque.
// Relaxed is fine for this, as adding elements is synced over the bot pointer.
current_entry.forwarding_stamp_.store(expected_stamp, std::memory_order_relaxed);
traded_cas_field new_cas_field;
new_cas_field.fill_with_stamp(expected_stamp, deque_id_);
published_task->traded_field_.store(new_cas_field, std::memory_order_relaxed);
current_entry.traded_task_.store(published_task, std::memory_order_relaxed);
// Advance the bot pointer. Linearization point for making the task public.
bot_internal_.stamp++;
bot_internal_.value++;
bot_.store(bot_internal_.value, std::memory_order_release);
return new_cas_field;
}
void popped_bot() {
bot_internal_.value--;
bot_.store(bot_internal_.value, std::memory_order_relaxed);
if (bot_internal_.value == 0) {
bot_internal_.stamp++;
top_.store({bot_internal_.stamp, 0}, std::memory_order_release);
}
}
void empty_deque() {
bot_internal_.value = 0;
bot_internal_.stamp++;
// TODO: We might be able to relax memory orderings...
bot_.store(bot_internal_.value);
top_.store(bot_internal_);
}
std::tuple<data_structures::optional<task *>, data_structures::stamped_integer> peek_top() {
auto local_top = top_.load();
auto local_bot = bot_.load();
if (local_top.value >= local_bot) {
return std::make_tuple(data_structures::optional<task *>{}, local_top);
} else {
return std::make_tuple(data_structures::optional<task *>{entries_[local_top.value].traded_task_}, local_top);
}
}
data_structures::optional<task *> pop_top(task *trade_offer, data_structures::stamped_integer local_top) {
auto local_bot = bot_.load();
if (local_top.value >= local_bot) {
return data_structures::optional<task *>{};
}
unsigned long expected_top_stamp = local_top.stamp;
auto &target_entry = entries_[local_top.value];
// Read our potential result
task *result = target_entry.traded_task_.load(std::memory_order_relaxed);
unsigned long forwarding_stamp = target_entry.forwarding_stamp_.load(std::memory_order_relaxed);
// Try to get it by CAS with the expected field entry, giving up our offered_object for it
traded_cas_field expected_field;
expected_field.fill_with_stamp(expected_top_stamp, deque_id_);
traded_cas_field offered_field;
offered_field.fill_with_trade_object(trade_offer);
if (result->traded_field_.compare_exchange_strong(expected_field, offered_field, std::memory_order_acq_rel)) {
// We got it, for sure move the top pointer forward.
top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1});
// Return the stolen task
return data_structures::optional<task *>{result};
} else {
// We did not get it...help forwarding the top pointer anyway.
if (expected_top_stamp == forwarding_stamp) {
// ...move the pointer forward if someone else put a valid trade object in there.
top_.compare_exchange_strong(local_top, {local_top.stamp + 1, local_top.value + 1});
} else {
// ...we failed because the top tag lags behind...try to fix it.
// This means only updating the tag, as this location can still hold data we need.
top_.compare_exchange_strong(local_top, {forwarding_stamp, local_top.value});
}
return data_structures::optional<task *>{};
}
}
private:
trading_deque_entry *entries_;
size_t num_entries_;
unsigned deque_id_;
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<data_structures::stamped_integer> top_{{0, 0}};
alignas(base::system_details::CACHE_LINE_SIZE) std::atomic<size_t> bot_{0};
data_structures::stamped_integer bot_internal_{0, 0};
};
template<size_t SIZE>
class static_external_trading_deque {
public:
static_external_trading_deque() : items_{}, deque_{items_.data(), SIZE} {}
external_trading_deque &get_deque() { return deque_; }
private:
std::array<trading_deque_entry, SIZE> items_;
external_trading_deque deque_;
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_TASK_TRADING_DEQUE_H_
...@@ -9,6 +9,8 @@ ...@@ -9,6 +9,8 @@
#include "pls/internal/base/system_details.h" #include "pls/internal/base/system_details.h"
#include "pls/internal/scheduling/traded_cas_field.h"
namespace pls { namespace pls {
namespace internal { namespace internal {
namespace scheduling { namespace scheduling {
...@@ -74,16 +76,15 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task { ...@@ -74,16 +76,15 @@ struct alignas(base::system_details::CACHE_LINE_SIZE) task {
return context_switcher::enter_context(stack_memory_, stack_size_, std::forward<F>(lambda)); return context_switcher::enter_context(stack_memory_, stack_size_, std::forward<F>(lambda));
} }
// TODO: Remove and add proper version // TODO: Proper access control
// Simulate 'fast' syncronization
std::atomic<int> flag_{0};
private:
// Stack/Continuation Management // Stack/Continuation Management
char *stack_memory_; char *stack_memory_;
size_t stack_size_; // TODO: We do not need this in every single task... size_t stack_size_;
context_switcher::continuation continuation_; context_switcher::continuation continuation_;
// Work-Stealing
std::atomic<traded_cas_field> traded_field_{};
// Task Tree (we have a parent that we want to continue when we finish) // Task Tree (we have a parent that we want to continue when we finish)
task *parent_task_; task *parent_task_;
unsigned depth_; unsigned depth_;
......
...@@ -9,8 +9,8 @@ ...@@ -9,8 +9,8 @@
#include "context_switcher/continuation.h" #include "context_switcher/continuation.h"
#include "pls/internal/scheduling/task.h" #include "pls/internal/scheduling/task.h"
#include "pls/internal/scheduling/external_trading_deque.h"
#include "pls/internal/data_structures/bounded_trading_deque.h"
#include "pls/internal/data_structures/aligned_stack.h" #include "pls/internal/data_structures/aligned_stack.h"
namespace pls { namespace pls {
...@@ -26,7 +26,11 @@ class task_manager { ...@@ -26,7 +26,11 @@ class task_manager {
explicit task_manager(task *tasks, explicit task_manager(task *tasks,
data_structures::aligned_stack static_stack_space, data_structures::aligned_stack static_stack_space,
size_t num_tasks, size_t num_tasks,
size_t stack_size) { size_t stack_size,
external_trading_deque &deque) : num_tasks_{num_tasks},
this_thread_tasks_{tasks},
active_task_{&tasks[0]},
deque_{deque} {
for (size_t i = 0; i < num_tasks - 1; i++) { for (size_t i = 0; i < num_tasks - 1; i++) {
tasks[i].init(static_stack_space.push_bytes(stack_size), stack_size, i, 0); tasks[i].init(static_stack_space.push_bytes(stack_size), stack_size, i, 0);
if (i > 0) { if (i > 0) {
...@@ -36,10 +40,6 @@ class task_manager { ...@@ -36,10 +40,6 @@ class task_manager {
tasks[i].set_next(&tasks[i + 1]); tasks[i].set_next(&tasks[i + 1]);
} }
} }
num_tasks_ = num_tasks;
this_thread_tasks_ = tasks;
active_task_ = &tasks[0];
} }
task &get_this_thread_task(size_t depth) { task &get_this_thread_task(size_t depth) {
...@@ -66,18 +66,19 @@ class task_manager { ...@@ -66,18 +66,19 @@ class task_manager {
last_task->set_continuation(std::move(cont)); last_task->set_continuation(std::move(cont));
active_task_ = this_task; active_task_ = this_task;
// TODO: Publish last task on deque (do this properly, but this simulates the fastest possible impl) traded_cas_field expected_cas_value = deque_.push_bot(active_task_);
// last_task->flag_.store(1, std::memory_order_seq_cst); traded_cas_field empty_cas;
lambda(); lambda();
// TODO: Check if task was stolen from deque (do this properly, but this simulates the fastest possible impl) if (active_task_->traded_field_.compare_exchange_strong(expected_cas_value, empty_cas)) {
// if (last_task->flag_.exchange(0, std::memory_order_seq_cst) == 1) { active_task_ = last_task;
active_task_ = last_task; deque_.popped_bot();
return std::move(last_task->get_continuation()); return std::move(last_task->get_continuation());
// } else { } else {
// return context_switcher::continuation{nullptr}; deque_.empty_deque();
// } PLS_ERROR("Slow Path/Stealing not implemented!");
}
}); });
} }
...@@ -86,6 +87,8 @@ class task_manager { ...@@ -86,6 +87,8 @@ class task_manager {
task *this_thread_tasks_; task *this_thread_tasks_;
task *active_task_; task *active_task_;
external_trading_deque &deque_;
}; };
template<size_t NUM_TASKS, size_t STACK_SIZE> template<size_t NUM_TASKS, size_t STACK_SIZE>
...@@ -94,12 +97,15 @@ class static_task_manager { ...@@ -94,12 +97,15 @@ class static_task_manager {
static_task_manager() static_task_manager()
: tasks_{}, : tasks_{},
static_stack_storage_{}, static_stack_storage_{},
task_manager_{tasks_.data(), static_stack_storage_.get_stack(), NUM_TASKS, STACK_SIZE} {}; static_external_trading_deque_{},
task_manager_{tasks_.data(), static_stack_storage_.get_stack(), NUM_TASKS, STACK_SIZE,
static_external_trading_deque_.get_deque()} {};
task_manager &get_task_manager() { return task_manager_; } task_manager &get_task_manager() { return task_manager_; }
private: private:
std::array<task, NUM_TASKS> tasks_; std::array<task, NUM_TASKS> tasks_;
data_structures::static_aligned_stack<NUM_TASKS * STACK_SIZE> static_stack_storage_; data_structures::static_aligned_stack<NUM_TASKS * STACK_SIZE> static_stack_storage_;
static_external_trading_deque<NUM_TASKS> static_external_trading_deque_;
task_manager task_manager_; task_manager task_manager_;
}; };
......
#ifndef PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_
#define PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_
#include <atomic>
#include "pls/internal/base/error_handling.h"
#include "pls/internal/base/system_details.h"
namespace pls {
namespace internal {
namespace scheduling {
struct task;
struct traded_cas_field {
static_assert(base::system_details::CACHE_LINE_SIZE >= 4,
"Traded objects must not use their last address bits, as we use them for status flags."
"As traded objects are usually cache aligned, we need big enough cache lines.");
// Base size of our CAS integer/pointer
static constexpr unsigned long CAS_SIZE = base::system_details::CAS_SIZE;
// States of the integer (tag indicating current content)
static constexpr unsigned long EMPTY_TAG = 0x0lu;
static constexpr unsigned long STAMP_TAG = 0x1lu;
static constexpr unsigned long TRADE_TAG = 0x2lu;
// Bitmasks and shifts for cas_integer_, two variants:
// cas_integer_ = traded object | tag
// cas_integer_ = stamp | id | tag
static constexpr unsigned long TAG_SIZE = 2ul;
static constexpr unsigned long TAG_BITS = ~((~0x0ul) << TAG_SIZE);
static constexpr unsigned long TRADED_OBJECT_SIZE = CAS_SIZE - TAG_SIZE;
static constexpr unsigned long TRADED_OBJECT_SHIFT = TAG_SIZE;
static constexpr unsigned long TRADE_OBJECT_BITS = ~((~0x0ul) << TRADED_OBJECT_SIZE) << TRADED_OBJECT_SHIFT;
static constexpr unsigned long ID_SIZE = 10ul; // Up to 1024 cores
static constexpr unsigned long ID_SHIFT = TAG_SIZE;
static constexpr unsigned long ID_BITS = ~((~0x0ul) << ID_SIZE) << ID_SHIFT;
static constexpr unsigned long STAMP_SIZE = CAS_SIZE - TAG_SIZE - ID_SIZE;
static constexpr unsigned long STAMP_SHIFT = TAG_SIZE + ID_SIZE;
static constexpr unsigned long STAMP_BITS = ~((~0x0ul) << STAMP_SIZE) << STAMP_SHIFT;
public:
void fill_with_stamp(unsigned long stamp, unsigned long deque_id) {
cas_integer_ = (((stamp << STAMP_SHIFT) & STAMP_BITS) | ((stamp << ID_SHIFT) & ID_BITS) | STAMP_TAG);
}
unsigned long get_stamp() {
PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one.");
return (((unsigned long) cas_integer_) & STAMP_BITS) >> STAMP_SHIFT;
}
unsigned long get_deque_id() {
PLS_ASSERT(is_filled_with_tag(), "Must only read out the tag when the traded field contains one.");
return (((unsigned long) cas_integer_) & ID_BITS) >> ID_SHIFT;
}
bool is_filled_with_tag() {
return (((unsigned long) cas_integer_) & TAG_BITS) == STAMP_TAG;
}
void fill_with_trade_object(task *new_task) {
PLS_ASSERT((((unsigned long) new_task) & TAG_BITS) == 0,
"Must only store aligned objects in this data structure (last bits are needed for tag bit)");
cas_integer_ = (((unsigned long) new_task) | TRADE_TAG);
}
task *get_trade_object() {
PLS_ASSERT(is_filled_with_object(), "Must only read out the object when the traded field contains one.");
return reinterpret_cast<task *>(((unsigned long) cas_integer_) & TRADE_OBJECT_BITS);
}
bool is_filled_with_object() {
return (((unsigned long) cas_integer_) & TAG_BITS) == TRADE_TAG;
}
bool is_empty() {
return (((unsigned long) cas_integer_) & TAG_BITS) == EMPTY_TAG;
}
private:
base::system_details::cas_integer cas_integer_{};
};
}
}
}
#endif //PLS_INTERNAL_SCHEDULING_TRADED_CAS_FIELD_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment