Migrate test/ to absl::AnyInvocable based TaskQueueBase interface

Bug: webrtc:14245
Change-Id: Ib410d1b03a23e5f00927456f7239c0dc7e68b824
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268184
Reviewed-by: Artem Titov <titovartem@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37497}
This commit is contained in:
Danil Chapovalov 2022-07-07 20:29:30 +02:00 committed by WebRTC LUCI CQ
parent b981394841
commit 9c125c6603
26 changed files with 151 additions and 175 deletions

View File

@ -556,8 +556,8 @@ if (rtc_include_tests && !build_with_chromium) {
"../api:scoped_refptr",
"../api:simulcast_test_fixture_api",
"../api/task_queue:task_queue_test",
"../api/task_queue:to_queued_task",
"../api/test/video:function_video_factory",
"../api/units:time_delta",
"../api/video:encoded_image",
"../api/video:video_frame",
"../api/video_codecs:video_codecs_api",
@ -871,10 +871,11 @@ rtc_library("run_loop") {
"run_loop.h",
]
deps = [
"../api/task_queue:to_queued_task",
"../api/task_queue",
"../rtc_base:threading",
"../rtc_base:timeutils",
]
absl_deps = [ "//third_party/abseil-cpp/absl/functional:any_invocable" ]
}
rtc_library("test_common") {
@ -915,7 +916,6 @@ rtc_library("test_common") {
"../api/rtc_event_log",
"../api/task_queue",
"../api/task_queue:default_task_queue_factory",
"../api/task_queue:to_queued_task",
"../api/test/video:function_video_factory",
"../api/transport:field_trial_based_config",
"../api/video:builtin_video_bitrate_allocator_factory",

View File

@ -61,11 +61,12 @@ int32_t FakeDecoder::Decode(const EncodedImage& input,
if (decode_delay_ms_ == 0 || !task_queue_) {
callback_->Decoded(frame);
} else {
task_queue_->PostDelayedHighPrecisionTask(ToQueuedTask([frame, this]() {
VideoFrame copy = frame;
callback_->Decoded(copy);
}),
decode_delay_ms_);
task_queue_->PostDelayedHighPrecisionTask(
[frame, this]() {
VideoFrame copy = frame;
callback_->Decoded(copy);
},
TimeDelta::Millis(decode_delay_ms_));
}
return WEBRTC_VIDEO_CODEC_OK;
@ -74,9 +75,8 @@ int32_t FakeDecoder::Decode(const EncodedImage& input,
void FakeDecoder::SetDelayedDecoding(int decode_delay_ms) {
RTC_CHECK(task_queue_factory_);
if (!task_queue_) {
task_queue_ =
std::make_unique<rtc::TaskQueue>(task_queue_factory_->CreateTaskQueue(
"fake_decoder", TaskQueueFactory::Priority::NORMAL));
task_queue_ = task_queue_factory_->CreateTaskQueue(
"fake_decoder", TaskQueueFactory::Priority::NORMAL);
}
decode_delay_ms_ = decode_delay_ms;
}

View File

@ -13,11 +13,13 @@
#include <stdint.h>
#include <memory>
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/video/encoded_image.h"
#include "api/video_codecs/video_decoder.h"
#include "modules/video_coding/include/video_codec_interface.h"
#include "rtc_base/task_queue.h"
namespace webrtc {
namespace test {
@ -52,7 +54,7 @@ class FakeDecoder : public VideoDecoder {
DecodedImageCallback* callback_;
int width_;
int height_;
std::unique_ptr<rtc::TaskQueue> task_queue_;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
TaskQueueFactory* task_queue_factory_;
int decode_delay_ms_;
};

View File

@ -17,7 +17,6 @@
#include <memory>
#include <string>
#include "api/task_queue/queued_task.h"
#include "api/video/video_content_type.h"
#include "modules/video_coding/codecs/h264/include/h264_globals.h"
#include "modules/video_coding/include/video_codec_interface.h"
@ -403,26 +402,6 @@ int32_t MultithreadedFakeH264Encoder::InitEncode(const VideoCodec* config,
return FakeH264Encoder::InitEncode(config, settings);
}
class MultithreadedFakeH264Encoder::EncodeTask : public QueuedTask {
public:
EncodeTask(MultithreadedFakeH264Encoder* encoder,
const VideoFrame& input_image,
const std::vector<VideoFrameType>* frame_types)
: encoder_(encoder),
input_image_(input_image),
frame_types_(*frame_types) {}
private:
bool Run() override {
encoder_->EncodeCallback(input_image_, &frame_types_);
return true;
}
MultithreadedFakeH264Encoder* const encoder_;
VideoFrame input_image_;
std::vector<VideoFrameType> frame_types_;
};
int32_t MultithreadedFakeH264Encoder::Encode(
const VideoFrame& input_image,
const std::vector<VideoFrameType>* frame_types) {
@ -435,7 +414,9 @@ int32_t MultithreadedFakeH264Encoder::Encode(
return WEBRTC_VIDEO_CODEC_UNINITIALIZED;
}
queue->PostTask(std::make_unique<EncodeTask>(this, input_image, frame_types));
queue->PostTask([this, input_image, frame_types = *frame_types] {
EncodeCallback(input_image, &frame_types);
});
return WEBRTC_VIDEO_CODEC_OK;
}

View File

@ -164,8 +164,6 @@ class MultithreadedFakeH264Encoder : public test::FakeH264Encoder {
int32_t Release() override;
protected:
class EncodeTask;
TaskQueueFactory* const task_queue_factory_;
int current_queue_ RTC_GUARDED_BY(sequence_checker_);
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> queue1_

View File

@ -48,7 +48,6 @@ rtc_library("emulated_network") {
"../../api:time_controller",
"../../api/numerics",
"../../api/task_queue:pending_task_safety_flag",
"../../api/task_queue:to_queued_task",
"../../api/test/network_emulation",
"../../api/transport:stun_types",
"../../api/units:data_rate",

View File

@ -143,7 +143,7 @@ TcpMessageRouteImpl::TcpMessageRouteImpl(Clock* clock,
void TcpMessageRouteImpl::SendMessage(size_t size,
std::function<void()> on_received) {
task_queue_->PostTask(
ToQueuedTask([this, size, handler = std::move(on_received)] {
[this, size, handler = std::move(on_received)] {
// If we are currently sending a message we won't reset the connection,
// we'll act as if the messages are sent in the same TCP stream. This is
// intended to simulate recreation of a TCP session for each message
@ -168,7 +168,7 @@ void TcpMessageRouteImpl::SendMessage(size_t size,
}
messages_.emplace_back(message);
SendPackets(clock_->CurrentTime());
}));
});
}
void TcpMessageRouteImpl::OnRequest(TcpPacket packet_info) {
@ -231,11 +231,11 @@ void TcpMessageRouteImpl::SendPackets(Timestamp at_time) {
pending_.pop_front();
request_route_.SendPacket(send.fragment.size, send);
in_flight_.insert({seq_num, send});
task_queue_->PostDelayedTask(ToQueuedTask([this, seq_num] {
HandlePacketTimeout(seq_num,
clock_->CurrentTime());
}),
kPacketTimeout.ms());
task_queue_->PostDelayedTask(
[this, seq_num] {
HandlePacketTimeout(seq_num, clock_->CurrentTime());
},
kPacketTimeout);
}
}

View File

@ -18,7 +18,6 @@
#include "absl/algorithm/container.h"
#include "api/scoped_refptr.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/logging.h"
#include "rtc_base/thread.h"
@ -109,7 +108,7 @@ void FakeNetworkSocket::OnPacketReceived(EmulatedIpPacket packet) {
SignalReadEvent(this);
RTC_DCHECK(!pending_);
};
thread_->PostTask(ToQueuedTask(alive_, std::move(task)));
thread_->PostTask(SafeTask(alive_, std::move(task)));
socket_server_->WakeUp();
}

View File

@ -348,6 +348,7 @@ if (!build_with_chromium) {
"test_activities_executor.h",
]
deps = [
"../../../api/task_queue",
"../../../api/units:time_delta",
"../../../api/units:timestamp",
"../../../rtc_base:checks",

View File

@ -17,11 +17,12 @@
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_queue_for_test.h"
namespace webrtc {
namespace webrtc_pc_e2e {
void TestActivitiesExecutor::Start(TaskQueueForTest* task_queue) {
void TestActivitiesExecutor::Start(TaskQueueBase* task_queue) {
RTC_DCHECK(task_queue);
task_queue_ = task_queue;
MutexLock lock(&lock_);
@ -37,14 +38,12 @@ void TestActivitiesExecutor::Stop() {
// Already stopped or not started.
return;
}
task_queue_->SendTask(
[this]() {
MutexLock lock(&lock_);
for (auto& handle : repeating_task_handles_) {
handle.Stop();
}
},
RTC_FROM_HERE);
SendTask(RTC_FROM_HERE, task_queue_, [this]() {
MutexLock lock(&lock_);
for (auto& handle : repeating_task_handles_) {
handle.Stop();
}
});
task_queue_ = nullptr;
}
@ -83,14 +82,14 @@ void TestActivitiesExecutor::PostActivity(ScheduledActivity activity) {
if (activity.interval) {
if (remaining_delay == TimeDelta::Zero()) {
repeating_task_handles_.push_back(RepeatingTaskHandle::Start(
task_queue_->Get(), [activity, start_time, this]() {
task_queue_, [activity, start_time, this]() {
activity.func(Now() - start_time);
return *activity.interval;
}));
return;
}
repeating_task_handles_.push_back(RepeatingTaskHandle::DelayedStart(
task_queue_->Get(), remaining_delay, [activity, start_time, this]() {
task_queue_, remaining_delay, [activity, start_time, this]() {
activity.func(Now() - start_time);
return *activity.interval;
}));
@ -103,10 +102,9 @@ void TestActivitiesExecutor::PostActivity(ScheduledActivity activity) {
return;
}
task_queue_->PostDelayedTask(ToQueuedTask([activity, start_time, this]() {
activity.func(Now() - start_time);
}),
remaining_delay.ms());
task_queue_->PostDelayedTask(
[activity, start_time, this]() { activity.func(Now() - start_time); },
remaining_delay);
}
Timestamp TestActivitiesExecutor::Now() const {

View File

@ -15,6 +15,7 @@
#include <vector>
#include "absl/types/optional.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "rtc_base/synchronization/mutex.h"
@ -33,7 +34,8 @@ class TestActivitiesExecutor {
// Starts scheduled activities according to their schedule. All activities
// that will be scheduled after Start(...) was invoked will be executed
// immediately according to their schedule.
void Start(TaskQueueForTest* task_queue);
void Start(TaskQueueForTest* task_queue) { Start(task_queue->Get()); }
void Start(TaskQueueBase* task_queue);
void Stop();
// Schedule activity to be executed. If test isn't started yet, then activity
@ -61,7 +63,7 @@ class TestActivitiesExecutor {
Clock* const clock_;
TaskQueueForTest* task_queue_;
TaskQueueBase* task_queue_;
Mutex lock_;
// Time when test was started. Minus infinity means that it wasn't started

View File

@ -9,7 +9,6 @@
*/
#include "test/run_loop.h"
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/time_utils.h"
namespace webrtc {
@ -36,8 +35,7 @@ void RunLoop::Quit() {
}
void RunLoop::Flush() {
worker_thread_.PostTask(
ToQueuedTask([this]() { socket_server_.FailNextWait(); }));
worker_thread_.PostTask([this]() { socket_server_.FailNextWait(); });
// If a test clock is used, like with GlobalSimulatedTimeController then the
// thread will loop forever since time never increases. Since the clock is
// simulated, 0ms can be used as the loop delay, which will process all

View File

@ -10,7 +10,10 @@
#ifndef TEST_RUN_LOOP_H_
#define TEST_RUN_LOOP_H_
#include "api/task_queue/to_queued_task.h"
#include <utility>
#include "absl/functional/any_invocable.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/thread.h"
namespace webrtc {
@ -31,16 +34,8 @@ class RunLoop {
void Flush();
// Convenience methods since TaskQueueBase doesn't support this sort of magic.
template <typename Closure>
void PostTask(Closure&& task) {
task_queue()->PostTask(ToQueuedTask(std::forward<Closure>(task)));
}
template <typename Closure>
void PostDelayedTask(Closure&& task, uint32_t milliseconds) {
task_queue()->PostDelayedTask(ToQueuedTask(std::forward<Closure>(task)),
milliseconds);
void PostTask(absl::AnyInvocable<void() &&> task) {
task_queue()->PostTask(std::move(task));
}
private:

View File

@ -10,7 +10,7 @@
#include "test/run_loop.h"
#include "api/task_queue/to_queued_task.h"
#include "api/units/time_delta.h"
#include "rtc_base/task_queue.h"
#include "test/gtest.h"
@ -34,12 +34,12 @@ TEST(RunLoopTest, Flush) {
TEST(RunLoopTest, Delayed) {
test::RunLoop loop;
bool ran = false;
loop.PostDelayedTask(
loop.task_queue()->PostDelayedTask(
[&ran, &loop]() {
ran = true;
loop.Quit();
},
100);
TimeDelta::Millis(100));
loop.Flush();
EXPECT_FALSE(ran);
loop.Run();

View File

@ -88,6 +88,7 @@ if (rtc_include_tests && !build_with_chromium) {
"../../api/audio_codecs:builtin_audio_encoder_factory",
"../../api/rtc_event_log",
"../../api/rtc_event_log:rtc_event_log_factory",
"../../api/task_queue",
"../../api/test/video:function_video_factory",
"../../api/transport:network_control",
"../../api/units:data_rate",
@ -133,7 +134,6 @@ if (rtc_include_tests && !build_with_chromium) {
"../../rtc_base:rtc_event",
"../../rtc_base:rtc_numerics",
"../../rtc_base:rtc_stats_counters",
"../../rtc_base:rtc_task_queue",
"../../rtc_base:safe_minmax",
"../../rtc_base:socket_address",
"../../rtc_base:task_queue_for_test",
@ -150,6 +150,7 @@ if (rtc_include_tests && !build_with_chromium) {
absl_deps = [
"//third_party/abseil-cpp/absl/flags:flag",
"//third_party/abseil-cpp/absl/flags:parse",
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",

View File

@ -22,7 +22,6 @@
#include "call/simulated_network.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue.h"
#include "test/network/network_emulation.h"
#include "test/scenario/column_printer.h"
#include "test/scenario/scenario_config.h"

View File

@ -244,30 +244,31 @@ AudioStreamPair* Scenario::CreateAudioStream(
}
void Scenario::Every(TimeDelta interval,
std::function<void(TimeDelta)> function) {
RepeatingTaskHandle::DelayedStart(task_queue_.Get(), interval,
[interval, function] {
function(interval);
return interval;
});
absl::AnyInvocable<void(TimeDelta)> function) {
RepeatingTaskHandle::DelayedStart(
task_queue_.get(), interval,
[interval, function = std::move(function)]() mutable {
function(interval);
return interval;
});
}
void Scenario::Every(TimeDelta interval, std::function<void()> function) {
RepeatingTaskHandle::DelayedStart(task_queue_.Get(), interval,
[interval, function] {
function();
return interval;
});
void Scenario::Every(TimeDelta interval, absl::AnyInvocable<void()> function) {
RepeatingTaskHandle::DelayedStart(
task_queue_.get(), interval,
[interval, function = std::move(function)]() mutable {
function();
return interval;
});
}
void Scenario::Post(std::function<void()> function) {
task_queue_.PostTask(function);
void Scenario::Post(absl::AnyInvocable<void() &&> function) {
task_queue_->PostTask(std::move(function));
}
void Scenario::At(TimeDelta offset, std::function<void()> function) {
void Scenario::At(TimeDelta offset, absl::AnyInvocable<void() &&> function) {
RTC_DCHECK_GT(offset, TimeSinceStart());
task_queue_.PostDelayedTask(ToQueuedTask(std::move(function)),
TimeUntilTarget(offset).ms());
task_queue_->PostDelayedTask(std::move(function), TimeUntilTarget(offset));
}
void Scenario::RunFor(TimeDelta duration) {

View File

@ -14,9 +14,10 @@
#include <utility>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "api/task_queue/task_queue_base.h"
#include "api/test/time_controller.h"
#include "rtc_base/fake_clock.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "test/gtest.h"
#include "test/logging/log_writer.h"
@ -102,17 +103,17 @@ class Scenario {
// Runs the provided function with a fixed interval. For real time tests,
// `function` starts being called after `interval` from the call to Every().
void Every(TimeDelta interval, std::function<void(TimeDelta)> function);
void Every(TimeDelta interval, std::function<void()> function);
void Every(TimeDelta interval, absl::AnyInvocable<void(TimeDelta)> function);
void Every(TimeDelta interval, absl::AnyInvocable<void()> function);
// Runs the provided function on the internal task queue. This ensure that
// it's run on the main thread for simulated time tests.
void Post(std::function<void()> function);
void Post(absl::AnyInvocable<void() &&> function);
// Runs the provided function after given duration has passed. For real time
// tests, `function` is called after `target_time_since_start` from the call
// to Every().
void At(TimeDelta offset, std::function<void()> function);
void At(TimeDelta offset, absl::AnyInvocable<void() &&> function);
// Sends a packet over the nodes and runs `action` when it has been delivered.
void NetworkDelayedAction(std::vector<EmulatedNetworkNode*> over_nodes,
@ -179,7 +180,7 @@ class Scenario {
Timestamp start_time_ = Timestamp::PlusInfinity();
// Defined last so it's destroyed first.
rtc::TaskQueue task_queue_;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
};
} // namespace test
} // namespace webrtc

View File

@ -28,7 +28,6 @@ rtc_library("time_controller") {
"../../api:time_controller",
"../../api/task_queue",
"../../api/task_queue:default_task_queue_factory",
"../../api/task_queue:to_queued_task",
"../../api/units:time_delta",
"../../api/units:timestamp",
"../../rtc_base",
@ -41,7 +40,10 @@ rtc_library("time_controller") {
"../../rtc_base/synchronization:yield_policy",
"../../system_wrappers",
]
absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
absl_deps = [
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/strings",
]
}
if (rtc_include_tests) {
@ -56,7 +58,6 @@ if (rtc_include_tests) {
":time_controller",
"../:test_support",
"../../api:time_controller",
"../../api/task_queue:to_queued_task",
"../../api/units:time_delta",
"../../rtc_base",
"../../rtc_base:location",

View File

@ -14,7 +14,7 @@
#include <memory>
#include <utility>
#include "api/task_queue/queued_task.h"
#include "absl/functional/any_invocable.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/units/time_delta.h"
@ -33,41 +33,36 @@ class ExternalTimeController::TaskQueueWrapper : public TaskQueueBase {
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> base)
: parent_(parent), base_(std::move(base)) {}
void PostTask(std::unique_ptr<QueuedTask> task) override {
void PostTask(absl::AnyInvocable<void() &&> task) override {
parent_->UpdateTime();
base_->PostTask(std::make_unique<TaskWrapper>(std::move(task), this));
base_->PostTask(TaskWrapper(std::move(task)));
parent_->ScheduleNext();
}
void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t ms) override {
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override {
parent_->UpdateTime();
base_->PostDelayedTask(std::make_unique<TaskWrapper>(std::move(task), this),
ms);
base_->PostDelayedTask(TaskWrapper(std::move(task)), delay);
parent_->ScheduleNext();
}
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override {
parent_->UpdateTime();
base_->PostDelayedHighPrecisionTask(TaskWrapper(std::move(task)), delay);
parent_->ScheduleNext();
}
void Delete() override { delete this; }
private:
class TaskWrapper : public QueuedTask {
public:
TaskWrapper(std::unique_ptr<QueuedTask> task, TaskQueueWrapper* queue)
: task_(std::move(task)), queue_(queue) {}
bool Run() override {
CurrentTaskQueueSetter current(queue_);
if (!task_->Run()) {
task_.release();
}
// The wrapper should always be deleted, even if it releases the inner
// task, in order to avoid leaking wrappers.
return true;
}
private:
std::unique_ptr<QueuedTask> task_;
TaskQueueWrapper* queue_;
};
absl::AnyInvocable<void() &&> TaskWrapper(
absl::AnyInvocable<void() &&> task) {
return [task = std::move(task), this]() mutable {
CurrentTaskQueueSetter current(this);
std::move(task)();
};
}
ExternalTimeController* const parent_;
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> base_;

View File

@ -29,8 +29,8 @@ SimulatedTaskQueue::~SimulatedTaskQueue() {
void SimulatedTaskQueue::Delete() {
// Need to destroy the tasks outside of the lock because task destruction
// can lead to re-entry in SimulatedTaskQueue via custom destructors.
std::deque<std::unique_ptr<QueuedTask>> ready_tasks;
std::map<Timestamp, std::vector<std::unique_ptr<QueuedTask>>> delayed_tasks;
std::deque<absl::AnyInvocable<void() &&>> ready_tasks;
std::map<Timestamp, std::vector<absl::AnyInvocable<void() &&>>> delayed_tasks;
{
MutexLock lock(&lock_);
ready_tasks_.swap(ready_tasks);
@ -47,20 +47,16 @@ void SimulatedTaskQueue::RunReady(Timestamp at_time) {
it != delayed_tasks_.end() && it->first <= at_time;
it = delayed_tasks_.erase(it)) {
for (auto& task : it->second) {
ready_tasks_.emplace_back(std::move(task));
ready_tasks_.push_back(std::move(task));
}
}
CurrentTaskQueueSetter set_current(this);
while (!ready_tasks_.empty()) {
std::unique_ptr<QueuedTask> ready = std::move(ready_tasks_.front());
absl::AnyInvocable<void()&&> ready = std::move(ready_tasks_.front());
ready_tasks_.pop_front();
lock_.Unlock();
bool delete_task = ready->Run();
if (delete_task) {
ready.reset();
} else {
ready.release();
}
std::move(ready)();
ready = nullptr;
lock_.Lock();
}
if (!delayed_tasks_.empty()) {
@ -70,17 +66,25 @@ void SimulatedTaskQueue::RunReady(Timestamp at_time) {
}
}
void SimulatedTaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
void SimulatedTaskQueue::PostTask(absl::AnyInvocable<void() &&> task) {
MutexLock lock(&lock_);
ready_tasks_.emplace_back(std::move(task));
ready_tasks_.push_back(std::move(task));
next_run_time_ = Timestamp::MinusInfinity();
}
void SimulatedTaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
void SimulatedTaskQueue::PostDelayedTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) {
PostDelayedHighPrecisionTask(std::move(task), delay);
}
void SimulatedTaskQueue::PostDelayedHighPrecisionTask(
absl::AnyInvocable<void() &&> task,
TimeDelta delay) {
MutexLock lock(&lock_);
// Some tests start to fail when precision of this simulated task queue is
// changed, thus delay is rounded to milliseconds.
Timestamp target_time =
handler_->CurrentTime() + TimeDelta::Millis(milliseconds);
handler_->CurrentTime() + TimeDelta::Millis(delay.ms());
delayed_tasks_[target_time].push_back(std::move(task));
next_run_time_ = std::min(next_run_time_, target_time);
}

View File

@ -15,6 +15,8 @@
#include <memory>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "api/units/time_delta.h"
#include "rtc_base/synchronization/mutex.h"
#include "test/time_controller/simulated_time_controller.h"
@ -38,9 +40,11 @@ class SimulatedTaskQueue : public TaskQueueBase,
// TaskQueueBase interface
void Delete() override;
void PostTask(std::unique_ptr<QueuedTask> task) override;
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) override;
void PostTask(absl::AnyInvocable<void() &&> task) override;
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override;
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override;
private:
sim_time_impl::SimulatedTimeControllerImpl* const handler_;
@ -49,8 +53,8 @@ class SimulatedTaskQueue : public TaskQueueBase,
mutable Mutex lock_;
std::deque<std::unique_ptr<QueuedTask>> ready_tasks_ RTC_GUARDED_BY(lock_);
std::map<Timestamp, std::vector<std::unique_ptr<QueuedTask>>> delayed_tasks_
std::deque<absl::AnyInvocable<void() &&>> ready_tasks_ RTC_GUARDED_BY(lock_);
std::map<Timestamp, std::vector<absl::AnyInvocable<void() &&>>> delayed_tasks_
RTC_GUARDED_BY(lock_);
Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity();

View File

@ -12,8 +12,6 @@
#include <algorithm>
#include <utility>
#include "api/task_queue/to_queued_task.h"
namespace webrtc {
namespace {

View File

@ -106,11 +106,15 @@ class TokenTaskQueue : public TaskQueueBase {
using CurrentTaskQueueSetter = TaskQueueBase::CurrentTaskQueueSetter;
void Delete() override { RTC_DCHECK_NOTREACHED(); }
void PostTask(std::unique_ptr<QueuedTask> /*task*/) override {
void PostTask(absl::AnyInvocable<void() &&> /*task*/) override {
RTC_DCHECK_NOTREACHED();
}
void PostDelayedTask(std::unique_ptr<QueuedTask> /*task*/,
uint32_t /*milliseconds*/) override {
void PostDelayedTask(absl::AnyInvocable<void() &&> /*task*/,
TimeDelta /*delay*/) override {
RTC_DCHECK_NOTREACHED();
}
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> /*task*/,
TimeDelta /*delay*/) override {
RTC_DCHECK_NOTREACHED();
}
};

View File

@ -106,22 +106,18 @@ TEST(SimulatedTimeControllerTest, Example) {
task_queue.PostTask(
[handle = std::move(handle)]() mutable { handle.Stop(); });
struct Destructor {
void operator()() { object.reset(); }
std::unique_ptr<ObjectOnTaskQueue> object;
};
task_queue.PostTask(Destructor{std::move(object)});
task_queue.PostTask([object = std::move(object)] {});
}
TEST(SimulatedTimeControllerTest, DelayTaskRunOnTime) {
GlobalSimulatedTimeController time_simulation(kStartTime);
rtc::TaskQueue task_queue(
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue =
time_simulation.GetTaskQueueFactory()->CreateTaskQueue(
"TestQueue", TaskQueueFactory::Priority::NORMAL));
"TestQueue", TaskQueueFactory::Priority::NORMAL);
bool delay_task_executed = false;
task_queue.PostDelayedTask(ToQueuedTask([&] { delay_task_executed = true; }),
10);
task_queue->PostDelayedTask([&] { delay_task_executed = true; },
TimeDelta::Millis(10));
time_simulation.AdvanceTime(TimeDelta::Millis(10));
EXPECT_TRUE(delay_task_executed);

View File

@ -11,7 +11,6 @@
#include <memory>
#include <vector>
#include "api/task_queue/to_queued_task.h"
#include "api/test/time_controller.h"
#include "api/units/time_delta.h"
#include "rtc_base/event.h"
@ -103,9 +102,9 @@ TEST_P(SimulatedRealTimeControllerConformanceTest, ThreadPostDelayedOrderTest) {
std::unique_ptr<rtc::Thread> thread = time_controller->CreateThread("thread");
ExecutionOrderKeeper execution_order;
thread->PostDelayedTask(ToQueuedTask([&]() { execution_order.Executed(2); }),
/*milliseconds=*/500);
thread->PostTask(ToQueuedTask([&]() { execution_order.Executed(1); }));
thread->PostDelayedTask([&]() { execution_order.Executed(2); },
TimeDelta::Millis(500));
thread->PostTask([&]() { execution_order.Executed(1); });
time_controller->AdvanceTime(TimeDelta::Millis(600));
EXPECT_THAT(execution_order.order(), ElementsAreArray({1, 2}));
// Destroy `thread` before `execution_order` to be sure `execution_order`
@ -161,11 +160,11 @@ TEST_P(SimulatedRealTimeControllerConformanceTest,
// posted/invoked.
ExecutionOrderKeeper execution_order;
rtc::Event event;
task_queue->PostTask(ToQueuedTask([&]() { execution_order.Executed(1); }));
task_queue->PostTask(ToQueuedTask([&]() {
task_queue->PostTask([&]() { execution_order.Executed(1); });
task_queue->PostTask([&]() {
execution_order.Executed(2);
event.Set();
}));
});
EXPECT_TRUE(event.Wait(/*give_up_after_ms=*/100,
/*warn_after_ms=*/10'000));
time_controller->AdvanceTime(TimeDelta::Millis(100));