diff --git a/core/templates/command_queue_mt.cpp b/core/templates/command_queue_mt.cpp index 238bf3975c..04a8095f0b 100644 --- a/core/templates/command_queue_mt.cpp +++ b/core/templates/command_queue_mt.cpp @@ -70,35 +70,7 @@ CommandQueueMT::SyncSemaphore *CommandQueueMT::_alloc_sync_sem() { return &sync_sems[idx]; } -bool CommandQueueMT::dealloc_one() { -tryagain: - if (dealloc_ptr == (write_ptr_and_epoch >> 1)) { - // The queue is empty - return false; - } - - uint32_t size = *(uint32_t *)&command_mem[dealloc_ptr]; - - if (size == 0) { - // End of command buffer wrap down - dealloc_ptr = 0; - goto tryagain; - } - - if (size & 1) { - // Still used, nothing can be deallocated - return false; - } - - dealloc_ptr += (size >> 1) + 8; - return true; -} - CommandQueueMT::CommandQueueMT(bool p_sync) { - command_mem_size = GLOBAL_DEF_RST("memory/limits/command_queue/multithreading_queue_size_kb", DEFAULT_COMMAND_MEM_SIZE_KB); - ProjectSettings::get_singleton()->set_custom_property_info("memory/limits/command_queue/multithreading_queue_size_kb", PropertyInfo(Variant::INT, "memory/limits/command_queue/multithreading_queue_size_kb", PROPERTY_HINT_RANGE, "1,4096,1,or_greater")); - command_mem_size *= 1024; - command_mem = (uint8_t *)memalloc(command_mem_size); if (p_sync) { sync = memnew(Semaphore); } @@ -108,5 +80,4 @@ CommandQueueMT::~CommandQueueMT() { if (sync) { memdelete(sync); } - memfree(command_mem); } diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index 0012cea72d..acc46da0d5 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -34,6 +34,8 @@ #include "core/os/memory.h" #include "core/os/mutex.h" #include "core/os/semaphore.h" +#include "core/string/print_string.h" +#include "core/templates/local_vector.h" #include "core/templates/simple_type.h" #include "core/typedefs.h" @@ -334,11 +336,7 @@ class CommandQueueMT { SYNC_SEMAPHORES = 8 }; - uint8_t *command_mem = nullptr; - uint32_t read_ptr_and_epoch = 0; - uint32_t write_ptr_and_epoch = 0; - uint32_t dealloc_ptr = 0; - uint32_t command_mem_size = 0; + LocalVector command_mem; SyncSemaphore sync_sems[SYNC_SEMAPHORES]; Mutex mutex; Semaphore *sync = nullptr; @@ -346,138 +344,47 @@ class CommandQueueMT { template T *allocate() { // alloc size is size+T+safeguard - uint32_t alloc_size = ((sizeof(T) + 8 - 1) & ~(8 - 1)) + 8; - - // Assert that the buffer is big enough to hold at least two messages. - ERR_FAIL_COND_V(alloc_size * 2 + sizeof(uint32_t) > command_mem_size, nullptr); - - tryagain: - uint32_t write_ptr = write_ptr_and_epoch >> 1; - - if (write_ptr < dealloc_ptr) { - // behind dealloc_ptr, check that there is room - if ((dealloc_ptr - write_ptr) <= alloc_size) { - // There is no more room, try to deallocate something - if (dealloc_one()) { - goto tryagain; - } - return nullptr; - } - } else { - // ahead of dealloc_ptr, check that there is room - - if ((command_mem_size - write_ptr) < alloc_size + sizeof(uint32_t)) { - // no room at the end, wrap down; - - if (dealloc_ptr == 0) { // don't want write_ptr to become dealloc_ptr - - // There is no more room, try to deallocate something - if (dealloc_one()) { - goto tryagain; - } - return nullptr; - } - - // if this happens, it's a bug - ERR_FAIL_COND_V((command_mem_size - write_ptr) < 8, nullptr); - // zero means, wrap to beginning - - uint32_t *p = (uint32_t *)&command_mem[write_ptr]; - *p = 1; - write_ptr_and_epoch = 0 | (1 & ~write_ptr_and_epoch); // Invert epoch. - // See if we can get the thread to run and clear up some more space while we wait. - // This is required if alloc_size * 2 + 4 > COMMAND_MEM_SIZE - if (sync) { - sync->post(); - } - goto tryagain; - } - } - // Allocate the size and the 'in use' bit. - // First bit used to mark if command is still in use (1) - // or if it has been destroyed and can be deallocated (0). - uint32_t size = (sizeof(T) + 8 - 1) & ~(8 - 1); - uint32_t *p = (uint32_t *)&command_mem[write_ptr]; - *p = (size << 1) | 1; - write_ptr += 8; - // allocate the command - T *cmd = memnew_placement(&command_mem[write_ptr], T); - write_ptr += size; - write_ptr_and_epoch = (write_ptr << 1) | (write_ptr_and_epoch & 1); + uint32_t alloc_size = ((sizeof(T) + 8 - 1) & ~(8 - 1)); + uint64_t size = command_mem.size(); + command_mem.resize(size + alloc_size + 8); + *(uint64_t *)&command_mem[size] = alloc_size; + T *cmd = memnew_placement(&command_mem[size + 8], T); return cmd; } template T *allocate_and_lock() { lock(); - T *ret; - - while ((ret = allocate()) == nullptr) { - unlock(); - // sleep a little until fetch happened and some room is made - wait_for_flush(); - lock(); - } - + T *ret = allocate(); return ret; } - bool flush_one(bool p_lock = true) { - if (p_lock) { - lock(); - } - tryagain: + void _flush() { + lock(); - // tried to read an empty queue - if (read_ptr_and_epoch == write_ptr_and_epoch) { - if (p_lock) { - unlock(); - } - return false; + uint64_t read_ptr = 0; + uint64_t limit = command_mem.size(); + + while (read_ptr < limit) { + uint64_t size = *(uint64_t *)&command_mem[read_ptr]; + read_ptr += 8; + CommandBase *cmd = reinterpret_cast(&command_mem[read_ptr]); + + cmd->call(); //execute the function + cmd->post(); //release in case it needs sync/ret + cmd->~CommandBase(); //should be done, so erase the command + + read_ptr += size; } - uint32_t read_ptr = read_ptr_and_epoch >> 1; - uint32_t size_ptr = read_ptr; - uint32_t size = *(uint32_t *)&command_mem[read_ptr] >> 1; - - if (size == 0) { - *(uint32_t *)&command_mem[read_ptr] = 0; // clear in-use bit. - //end of ringbuffer, wrap - read_ptr_and_epoch = 0 | (1 & ~read_ptr_and_epoch); // Invert epoch. - goto tryagain; - } - - read_ptr += 8; - - CommandBase *cmd = reinterpret_cast(&command_mem[read_ptr]); - - read_ptr += size; - - read_ptr_and_epoch = (read_ptr << 1) | (read_ptr_and_epoch & 1); - - if (p_lock) { - unlock(); - } - cmd->call(); - if (p_lock) { - lock(); - } - - cmd->post(); - cmd->~CommandBase(); - *(uint32_t *)&command_mem[size_ptr] &= ~1; - - if (p_lock) { - unlock(); - } - return true; + command_mem.clear(); + unlock(); } void lock(); void unlock(); void wait_for_flush(); SyncSemaphore *_alloc_sync_sem(); - bool dealloc_one(); public: /* NORMAL PUSH COMMANDS */ @@ -492,23 +399,19 @@ public: DECL_PUSH_AND_SYNC(0) SPACE_SEP_LIST(DECL_PUSH_AND_SYNC, 15) - void wait_and_flush_one() { - ERR_FAIL_COND(!sync); - sync->wait(); - flush_one(); - } - _FORCE_INLINE_ void flush_if_pending() { - if (unlikely(read_ptr_and_epoch != write_ptr_and_epoch)) { - flush_all(); + if (unlikely(command_mem.size() > 0)) { + _flush(); } } void flush_all() { - //ERR_FAIL_COND(sync); - lock(); - while (flush_one(false)) { - } - unlock(); + _flush(); + } + + void wait_and_flush() { + ERR_FAIL_COND(!sync); + sync->wait(); + _flush(); } CommandQueueMT(bool p_sync); diff --git a/doc/classes/ProjectSettings.xml b/doc/classes/ProjectSettings.xml index b74a1f848b..efd4793b4c 100644 --- a/doc/classes/ProjectSettings.xml +++ b/doc/classes/ProjectSettings.xml @@ -1139,8 +1139,6 @@ Optional name for the 3D render layer 9. If left empty, the layer will display as "Layer 9". - - Godot uses a message queue to defer some function calls. If you run out of space on it (you will see an error), you can increase the size here. diff --git a/servers/physics_2d/physics_server_2d_wrap_mt.cpp b/servers/physics_2d/physics_server_2d_wrap_mt.cpp index 790c87cc44..930b19c2cb 100644 --- a/servers/physics_2d/physics_server_2d_wrap_mt.cpp +++ b/servers/physics_2d/physics_server_2d_wrap_mt.cpp @@ -56,7 +56,7 @@ void PhysicsServer2DWrapMT::thread_loop() { step_thread_up.set(); while (!exit.is_set()) { // flush commands one by one, until exit is requested - command_queue.wait_and_flush_one(); + command_queue.wait_and_flush(); } command_queue.flush_all(); // flush all diff --git a/servers/physics_3d/physics_server_3d_wrap_mt.cpp b/servers/physics_3d/physics_server_3d_wrap_mt.cpp index f73f67a756..0a89c1a9c9 100644 --- a/servers/physics_3d/physics_server_3d_wrap_mt.cpp +++ b/servers/physics_3d/physics_server_3d_wrap_mt.cpp @@ -56,7 +56,7 @@ void PhysicsServer3DWrapMT::thread_loop() { step_thread_up = true; while (!exit) { // flush commands one by one, until exit is requested - command_queue.wait_and_flush_one(); + command_queue.wait_and_flush(); } command_queue.flush_all(); // flush all diff --git a/servers/rendering/rendering_server_default.cpp b/servers/rendering/rendering_server_default.cpp index c6fe6a07e0..cd66cd0716 100644 --- a/servers/rendering/rendering_server_default.cpp +++ b/servers/rendering/rendering_server_default.cpp @@ -358,7 +358,7 @@ void RenderingServerDefault::_thread_loop() { draw_thread_up.set(); while (!exit.is_set()) { // flush commands one by one, until exit is requested - command_queue.wait_and_flush_one(); + command_queue.wait_and_flush(); } command_queue.flush_all(); // flush all diff --git a/tests/test_command_queue.h b/tests/test_command_queue.h index 620fb96985..f0d4569942 100644 --- a/tests/test_command_queue.h +++ b/tests/test_command_queue.h @@ -156,7 +156,7 @@ public: command_queue.flush_all(); } for (int i = 0; i < message_count_to_read; i++) { - command_queue.wait_and_flush_one(); + command_queue.wait_and_flush(); } message_count_to_read = 0; @@ -276,50 +276,6 @@ TEST_CASE("[CommandQueue] Test Queue Basics") { ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING)); } -TEST_CASE("[CommandQueue] Test Waiting at Queue Full") { - const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb"; - ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1); - SharedThreadState sts; - sts.init_threads(); - - int msgs_to_add = 24; // a queue of size 1kB fundamentally cannot fit 24 matrices. - for (int i = 0; i < msgs_to_add; i++) { - sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM); - } - sts.writer_threadwork.main_start_work(); - // If we call main_wait_for_done, we will deadlock. So instead... - sts.message_count_to_read = 1; - sts.reader_threadwork.main_start_work(); - sts.reader_threadwork.main_wait_for_done(); - CHECK_MESSAGE(sts.func1_count == 1, - "Reader should have read one message"); - CHECK_MESSAGE(sts.during_writing, - "Writer thread should still be blocked on writing."); - sts.message_count_to_read = msgs_to_add - 3; - sts.reader_threadwork.main_start_work(); - sts.reader_threadwork.main_wait_for_done(); - CHECK_MESSAGE(sts.func1_count >= msgs_to_add - 3, - "Reader should have read most messages"); - sts.writer_threadwork.main_wait_for_done(); - CHECK_MESSAGE(sts.during_writing == false, - "Writer thread should no longer be blocked on writing."); - sts.message_count_to_read = 2; - sts.reader_threadwork.main_start_work(); - sts.reader_threadwork.main_wait_for_done(); - sts.message_count_to_read = -1; - sts.reader_threadwork.main_start_work(); - sts.reader_threadwork.main_wait_for_done(); - CHECK_MESSAGE(sts.func1_count == msgs_to_add, - "Reader should have read all messages"); - - sts.destroy_threads(); - - CHECK_MESSAGE(sts.func1_count == msgs_to_add, - "Reader should have read no additional messages after join"); - ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, - ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING)); -} - TEST_CASE("[CommandQueue] Test Queue Wrapping to same spot.") { const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb"; ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);