Remove thread hops from events provided by JsepTransportController.

Events associated with Subscribe* methods in JTC had trampolines that
would use an async invoker to fire the events on the signaling thread.
This was being done for the purposes of PeerConnection but the concept
of a signaling thread is otherwise not applicable to JTC and use of
JTC from PC is inconsistent across threads (as has been flagged in
webrtc:9987).

This change makes all CallbackList members only accessible from the
network thread and moves the signaling thread related work over to
PeerConnection, which makes hops there more visible as well as making
that class easier to refactor for thread efficiency.

This CL removes the AsyncInvoker from JTC (webrtc:12339)

The signaling_thread_ variable is also removed from JTC and more thread
checks added to catch errors.

Bug: webrtc:12427, webrtc:11988, webrtc:12339
Change-Id: Id232aedd00dfd5403b2ba0ca147d3eca7c12c7c5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/206062
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33195}
This commit is contained in:
Tomas Gunnarsson 2021-02-08 16:00:10 +01:00 committed by Commit Bot
parent f4fa763aee
commit f554b3c577
8 changed files with 263 additions and 269 deletions

View File

@ -62,7 +62,6 @@ class ConnectionContext : public rtc::RefCountInterface {
// Functions called from PeerConnection and friends
SctpTransportFactoryInterface* sctp_transport_factory() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return sctp_factory_.get();
}
@ -123,8 +122,7 @@ class ConnectionContext : public rtc::RefCountInterface {
RTC_GUARDED_BY(signaling_thread_);
std::unique_ptr<cricket::MediaEngineInterface> media_engine_
RTC_GUARDED_BY(signaling_thread_);
std::unique_ptr<SctpTransportFactoryInterface> const sctp_factory_
RTC_GUARDED_BY(signaling_thread_);
std::unique_ptr<SctpTransportFactoryInterface> const sctp_factory_;
// Accessed both on signaling thread and worker thread.
std::unique_ptr<WebRtcKeyValueConfig> const trials_;
};

View File

@ -84,13 +84,11 @@ webrtc::RTCError VerifyCandidates(const cricket::Candidates& candidates) {
namespace webrtc {
JsepTransportController::JsepTransportController(
rtc::Thread* signaling_thread,
rtc::Thread* network_thread,
cricket::PortAllocator* port_allocator,
AsyncResolverFactory* async_resolver_factory,
Config config)
: signaling_thread_(signaling_thread),
network_thread_(network_thread),
: network_thread_(network_thread),
port_allocator_(port_allocator),
async_resolver_factory_(async_resolver_factory),
config_(config),
@ -222,12 +220,6 @@ void JsepTransportController::SetNeedsIceRestartFlag() {
bool JsepTransportController::NeedsIceRestart(
const std::string& transport_name) const {
if (!network_thread_->IsCurrent()) {
RTC_DCHECK_RUN_ON(signaling_thread_);
return network_thread_->Invoke<bool>(
RTC_FROM_HERE, [&] { return NeedsIceRestart(transport_name); });
}
RTC_DCHECK_RUN_ON(network_thread_);
const cricket::JsepTransport* transport =
@ -414,11 +406,6 @@ RTCError JsepTransportController::RemoveRemoteCandidates(
bool JsepTransportController::GetStats(const std::string& transport_name,
cricket::TransportStats* stats) {
if (!network_thread_->IsCurrent()) {
return network_thread_->Invoke<bool>(
RTC_FROM_HERE, [=] { return GetStats(transport_name, stats); });
}
RTC_DCHECK_RUN_ON(network_thread_);
cricket::JsepTransport* transport = GetJsepTransportByName(transport_name);
@ -1194,35 +1181,24 @@ void JsepTransportController::OnTransportCandidateGathered_n(
RTC_NOTREACHED();
return;
}
std::string transport_name = transport->transport_name();
// TODO(bugs.webrtc.org/12427): See if we can get rid of this. We should be
// able to just call this directly here.
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_, [this, transport_name, candidate] {
signal_ice_candidates_gathered_.Send(
transport_name, std::vector<cricket::Candidate>{candidate});
});
signal_ice_candidates_gathered_.Send(
transport->transport_name(), std::vector<cricket::Candidate>{candidate});
}
void JsepTransportController::OnTransportCandidateError_n(
cricket::IceTransportInternal* transport,
const cricket::IceCandidateErrorEvent& event) {
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, [this, event] {
signal_ice_candidate_error_.Send(event);
});
signal_ice_candidate_error_.Send(event);
}
void JsepTransportController::OnTransportCandidatesRemoved_n(
cricket::IceTransportInternal* transport,
const cricket::Candidates& candidates) {
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_,
[this, candidates] { signal_ice_candidates_removed_.Send(candidates); });
signal_ice_candidates_removed_.Send(candidates);
}
void JsepTransportController::OnTransportCandidatePairChanged_n(
const cricket::CandidatePairChangeEvent& event) {
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, [this, event] {
signal_ice_candidate_pair_changed_.Send(event);
});
signal_ice_candidate_pair_changed_.Send(event);
}
void JsepTransportController::OnTransportRoleConflict_n(
@ -1298,10 +1274,7 @@ void JsepTransportController::UpdateAggregateStates_n() {
if (ice_connection_state_ != new_connection_state) {
ice_connection_state_ = new_connection_state;
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_, [this, new_connection_state] {
signal_ice_connection_state_.Send(new_connection_state);
});
signal_ice_connection_state_.Send(new_connection_state);
}
// Compute the current RTCIceConnectionState as described in
@ -1357,17 +1330,11 @@ void JsepTransportController::UpdateAggregateStates_n() {
new_ice_connection_state ==
PeerConnectionInterface::kIceConnectionCompleted) {
// Ensure that we never skip over the "connected" state.
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, [this] {
signal_standardized_ice_connection_state_.Send(
PeerConnectionInterface::kIceConnectionConnected);
});
signal_standardized_ice_connection_state_.Send(
PeerConnectionInterface::kIceConnectionConnected);
}
standardized_ice_connection_state_ = new_ice_connection_state;
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_,
[this, new_ice_connection_state] {
signal_standardized_ice_connection_state_.Send(
new_ice_connection_state);
});
signal_standardized_ice_connection_state_.Send(new_ice_connection_state);
}
// Compute the current RTCPeerConnectionState as described in
@ -1418,10 +1385,7 @@ void JsepTransportController::UpdateAggregateStates_n() {
if (combined_connection_state_ != new_combined_state) {
combined_connection_state_ = new_combined_state;
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_, [this, new_combined_state] {
signal_connection_state_.Send(new_combined_state);
});
signal_connection_state_.Send(new_combined_state);
}
// Compute the gathering state.
@ -1434,10 +1398,7 @@ void JsepTransportController::UpdateAggregateStates_n() {
}
if (ice_gathering_state_ != new_gathering_state) {
ice_gathering_state_ = new_gathering_state;
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_, [this, new_gathering_state] {
signal_ice_gathering_state_.Send(new_gathering_state);
});
signal_ice_gathering_state_.Send(new_gathering_state);
}
}

View File

@ -54,7 +54,6 @@
#include "pc/session_description.h"
#include "pc/srtp_transport.h"
#include "pc/transport_stats.h"
#include "rtc_base/async_invoker.h"
#include "rtc_base/callback_list.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/copy_on_write_buffer.h"
@ -137,10 +136,11 @@ class JsepTransportController : public sigslot::has_slots<> {
std::function<void(const rtc::SSLHandshakeError)> on_dtls_handshake_error_;
};
// The ICE related events are signaled on the |signaling_thread|.
// All the transport related methods are called on the |network_thread|.
JsepTransportController(rtc::Thread* signaling_thread,
rtc::Thread* network_thread,
// The ICE related events are fired on the |network_thread|.
// All the transport related methods are called on the |network_thread|
// and destruction of the JsepTransportController must occur on the
// |network_thread|.
JsepTransportController(rtc::Thread* network_thread,
cricket::PortAllocator* port_allocator,
AsyncResolverFactory* async_resolver_factory,
Config config);
@ -227,26 +227,28 @@ class JsepTransportController : public sigslot::has_slots<> {
// F: void(const std::string&, const std::vector<cricket::Candidate>&)
template <typename F>
void SubscribeIceCandidateGathered(F&& callback) {
// TODO(bugs.webrtc.org/12427): Post this subscription to the network
// thread.
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_candidates_gathered_.AddReceiver(std::forward<F>(callback));
}
// F: void(cricket::IceConnectionState)
template <typename F>
void SubscribeIceConnectionState(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_connection_state_.AddReceiver(std::forward<F>(callback));
}
// F: void(PeerConnectionInterface::PeerConnectionState)
template <typename F>
void SubscribeConnectionState(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_connection_state_.AddReceiver(std::forward<F>(callback));
}
// F: void(PeerConnectionInterface::IceConnectionState)
template <typename F>
void SubscribeStandardizedIceConnectionState(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_standardized_ice_connection_state_.AddReceiver(
std::forward<F>(callback));
}
@ -254,60 +256,65 @@ class JsepTransportController : public sigslot::has_slots<> {
// F: void(cricket::IceGatheringState)
template <typename F>
void SubscribeIceGatheringState(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_gathering_state_.AddReceiver(std::forward<F>(callback));
}
// F: void(const cricket::IceCandidateErrorEvent&)
template <typename F>
void SubscribeIceCandidateError(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_candidate_error_.AddReceiver(std::forward<F>(callback));
}
// F: void(const std::vector<cricket::Candidate>&)
template <typename F>
void SubscribeIceCandidatesRemoved(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_candidates_removed_.AddReceiver(std::forward<F>(callback));
}
// F: void(const cricket::CandidatePairChangeEvent&)
template <typename F>
void SubscribeIceCandidatePairChanged(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_candidate_pair_changed_.AddReceiver(std::forward<F>(callback));
}
private:
// All of these callbacks are fired on the signaling thread.
// All of these callbacks are fired on the network thread.
// If any transport failed => failed,
// Else if all completed => completed,
// Else if all connected => connected,
// Else => connecting
CallbackList<cricket::IceConnectionState> signal_ice_connection_state_;
CallbackList<cricket::IceConnectionState> signal_ice_connection_state_
RTC_GUARDED_BY(network_thread_);
CallbackList<PeerConnectionInterface::PeerConnectionState>
signal_connection_state_;
signal_connection_state_ RTC_GUARDED_BY(network_thread_);
CallbackList<PeerConnectionInterface::IceConnectionState>
signal_standardized_ice_connection_state_;
signal_standardized_ice_connection_state_ RTC_GUARDED_BY(network_thread_);
// If all transports done gathering => complete,
// Else if any are gathering => gathering,
// Else => new
CallbackList<cricket::IceGatheringState> signal_ice_gathering_state_;
CallbackList<cricket::IceGatheringState> signal_ice_gathering_state_
RTC_GUARDED_BY(network_thread_);
// [mid, candidates]
// TODO(bugs.webrtc.org/12427): Protect this with network_thread_.
CallbackList<const std::string&, const std::vector<cricket::Candidate>&>
signal_ice_candidates_gathered_;
signal_ice_candidates_gathered_ RTC_GUARDED_BY(network_thread_);
CallbackList<const cricket::IceCandidateErrorEvent&>
signal_ice_candidate_error_;
signal_ice_candidate_error_ RTC_GUARDED_BY(network_thread_);
CallbackList<const std::vector<cricket::Candidate>&>
signal_ice_candidates_removed_;
signal_ice_candidates_removed_ RTC_GUARDED_BY(network_thread_);
CallbackList<const cricket::CandidatePairChangeEvent&>
signal_ice_candidate_pair_changed_;
signal_ice_candidate_pair_changed_ RTC_GUARDED_BY(network_thread_);
RTCError ApplyDescription_n(bool local,
SdpType type,
@ -452,7 +459,6 @@ class JsepTransportController : public sigslot::has_slots<> {
void OnDtlsHandshakeError(rtc::SSLHandshakeError error);
rtc::Thread* const signaling_thread_ = nullptr;
rtc::Thread* const network_thread_ = nullptr;
cricket::PortAllocator* const port_allocator_ = nullptr;
AsyncResolverFactory* const async_resolver_factory_ = nullptr;
@ -490,7 +496,6 @@ class JsepTransportController : public sigslot::has_slots<> {
cricket::IceRole ice_role_ = cricket::ICEROLE_CONTROLLING;
uint64_t ice_tiebreaker_ = rtc::CreateRandomId64();
rtc::scoped_refptr<rtc::RTCCertificate> certificate_;
rtc::AsyncInvoker invoker_;
RTC_DISALLOW_COPY_AND_ASSIGN(JsepTransportController);
};

View File

@ -74,7 +74,6 @@ class JsepTransportControllerTest : public JsepTransportController::Observer,
void CreateJsepTransportController(
JsepTransportController::Config config,
rtc::Thread* signaling_thread = rtc::Thread::Current(),
rtc::Thread* network_thread = rtc::Thread::Current(),
cricket::PortAllocator* port_allocator = nullptr) {
config.transport_observer = this;
@ -84,9 +83,10 @@ class JsepTransportControllerTest : public JsepTransportController::Observer,
config.dtls_transport_factory = fake_dtls_transport_factory_.get();
config.on_dtls_handshake_error_ = [](rtc::SSLHandshakeError s) {};
transport_controller_ = std::make_unique<JsepTransportController>(
signaling_thread, network_thread, port_allocator,
nullptr /* async_resolver_factory */, config);
ConnectTransportControllerSignals();
network_thread, port_allocator, nullptr /* async_resolver_factory */,
config);
network_thread->Invoke<void>(RTC_FROM_HERE,
[&] { ConnectTransportControllerSignals(); });
}
void ConnectTransportControllerSignals() {
@ -276,18 +276,14 @@ class JsepTransportControllerTest : public JsepTransportController::Observer,
protected:
void OnConnectionState(cricket::IceConnectionState state) {
if (!signaling_thread_->IsCurrent()) {
signaled_on_non_signaling_thread_ = true;
}
ice_signaled_on_thread_ = rtc::Thread::Current();
connection_state_ = state;
++connection_state_signal_count_;
}
void OnStandardizedIceConnectionState(
PeerConnectionInterface::IceConnectionState state) {
if (!signaling_thread_->IsCurrent()) {
signaled_on_non_signaling_thread_ = true;
}
ice_signaled_on_thread_ = rtc::Thread::Current();
ice_connection_state_ = state;
++ice_connection_state_signal_count_;
}
@ -296,26 +292,20 @@ class JsepTransportControllerTest : public JsepTransportController::Observer,
PeerConnectionInterface::PeerConnectionState state) {
RTC_LOG(LS_INFO) << "OnCombinedConnectionState: "
<< static_cast<int>(state);
if (!signaling_thread_->IsCurrent()) {
signaled_on_non_signaling_thread_ = true;
}
ice_signaled_on_thread_ = rtc::Thread::Current();
combined_connection_state_ = state;
++combined_connection_state_signal_count_;
}
void OnGatheringState(cricket::IceGatheringState state) {
if (!signaling_thread_->IsCurrent()) {
signaled_on_non_signaling_thread_ = true;
}
ice_signaled_on_thread_ = rtc::Thread::Current();
gathering_state_ = state;
++gathering_state_signal_count_;
}
void OnCandidatesGathered(const std::string& transport_name,
const Candidates& candidates) {
if (!signaling_thread_->IsCurrent()) {
signaled_on_non_signaling_thread_ = true;
}
ice_signaled_on_thread_ = rtc::Thread::Current();
candidates_[transport_name].insert(candidates_[transport_name].end(),
candidates.begin(), candidates.end());
++candidates_signal_count_;
@ -360,7 +350,7 @@ class JsepTransportControllerTest : public JsepTransportController::Observer,
std::unique_ptr<FakeIceTransportFactory> fake_ice_transport_factory_;
std::unique_ptr<FakeDtlsTransportFactory> fake_dtls_transport_factory_;
rtc::Thread* const signaling_thread_ = nullptr;
bool signaled_on_non_signaling_thread_ = false;
rtc::Thread* ice_signaled_on_thread_ = nullptr;
// Used to verify the SignalRtpTransportChanged/SignalDtlsTransportChanged are
// signaled correctly.
std::map<std::string, RtpTransportInternal*> changed_rtp_transport_by_mid_;
@ -883,11 +873,12 @@ TEST_F(JsepTransportControllerTest, SignalCandidatesGathered) {
EXPECT_EQ(1u, candidates_[kAudioMid1].size());
}
TEST_F(JsepTransportControllerTest, IceSignalingOccursOnSignalingThread) {
TEST_F(JsepTransportControllerTest, IceSignalingOccursOnNetworkThread) {
network_thread_ = rtc::Thread::CreateWithSocketServer();
network_thread_->Start();
EXPECT_EQ(ice_signaled_on_thread_, nullptr);
CreateJsepTransportController(JsepTransportController::Config(),
signaling_thread_, network_thread_.get(),
network_thread_.get(),
/*port_allocator=*/nullptr);
CreateLocalDescriptionAndCompleteConnectionOnNetworkThread();
@ -903,7 +894,7 @@ TEST_F(JsepTransportControllerTest, IceSignalingOccursOnSignalingThread) {
EXPECT_EQ_WAIT(1u, candidates_[kVideoMid1].size(), kTimeout);
EXPECT_EQ(2, candidates_signal_count_);
EXPECT_TRUE(!signaled_on_non_signaling_thread_);
EXPECT_EQ(ice_signaled_on_thread_, network_thread_.get());
network_thread_->Invoke<void>(RTC_FROM_HERE,
[&] { transport_controller_.reset(); });

View File

@ -88,7 +88,6 @@ const char kSimulcastNumberOfEncodings[] =
static const int REPORT_USAGE_PATTERN_DELAY_MS = 60000;
uint32_t ConvertIceTransportTypeToCandidateFilter(
PeerConnectionInterface::IceTransportsType type) {
switch (type) {
@ -264,6 +263,20 @@ bool HasRtcpMuxEnabled(const cricket::ContentInfo* content) {
return content->media_description()->rtcp_mux();
}
bool DtlsEnabled(const PeerConnectionInterface::RTCConfiguration& configuration,
const PeerConnectionFactoryInterface::Options& options,
const PeerConnectionDependencies& dependencies) {
if (options.disable_encryption)
return false;
// Enable DTLS by default if we have an identity store or a certificate.
bool default_enabled =
(dependencies.cert_generator || !configuration.certificates.empty());
// The |configuration| can override the default value.
return configuration.enable_dtls_srtp.value_or(default_enabled);
}
} // namespace
bool PeerConnectionInterface::RTCConfiguration::operator==(
@ -421,11 +434,12 @@ RTCErrorOr<rtc::scoped_refptr<PeerConnection>> PeerConnection::Create(
bool is_unified_plan =
configuration.sdp_semantics == SdpSemantics::kUnifiedPlan;
bool dtls_enabled = DtlsEnabled(configuration, options, dependencies);
// The PeerConnection constructor consumes some, but not all, dependencies.
rtc::scoped_refptr<PeerConnection> pc(
new rtc::RefCountedObject<PeerConnection>(
context, options, is_unified_plan, std::move(event_log),
std::move(call), dependencies));
std::move(call), dependencies, dtls_enabled));
RTCError init_error = pc->Initialize(configuration, std::move(dependencies));
if (!init_error.ok()) {
RTC_LOG(LS_ERROR) << "PeerConnection initialization failed";
@ -440,7 +454,8 @@ PeerConnection::PeerConnection(
bool is_unified_plan,
std::unique_ptr<RtcEventLog> event_log,
std::unique_ptr<Call> call,
PeerConnectionDependencies& dependencies)
PeerConnectionDependencies& dependencies,
bool dtls_enabled)
: context_(context),
options_(options),
observer_(dependencies.observer),
@ -453,9 +468,17 @@ PeerConnection::PeerConnection(
tls_cert_verifier_(std::move(dependencies.tls_cert_verifier)),
call_(std::move(call)),
call_ptr_(call_.get()),
dtls_enabled_(dtls_enabled),
data_channel_controller_(this),
message_handler_(signaling_thread()),
weak_factory_(this) {}
weak_factory_(this) {
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
worker_thread_safety_ = PendingTaskSafetyFlag::Create();
if (!call_)
worker_thread_safety_->SetNotAlive();
});
}
PeerConnection::~PeerConnection() {
TRACE_EVENT0("webrtc", "PeerConnection::~PeerConnection");
@ -496,15 +519,13 @@ PeerConnection::~PeerConnection() {
RTC_DCHECK_RUN_ON(network_thread());
transport_controller_.reset();
port_allocator_.reset();
if (network_thread_safety_) {
if (network_thread_safety_)
network_thread_safety_->SetNotAlive();
network_thread_safety_ = nullptr;
}
});
// call_ and event_log_ must be destroyed on the worker thread.
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
call_safety_.reset();
worker_thread_safety_->SetNotAlive();
call_.reset();
// The event log must outlive call (and any other object that uses it).
event_log_.reset();
@ -531,20 +552,6 @@ RTCError PeerConnection::Initialize(
turn_server.turn_logging_id = configuration.turn_logging_id;
}
// The port allocator lives on the network thread and should be initialized
// there. Also set up the task safety flag for canceling pending tasks on
// the network thread when closing.
// TODO(bugs.webrtc.org/12427): See if we can piggyback on this call and
// initialize all the |transport_controller_->Subscribe*| calls below on the
// network thread via this invoke.
const auto pa_result =
network_thread()->Invoke<InitializePortAllocatorResult>(
RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &configuration] {
network_thread_safety_ = PendingTaskSafetyFlag::Create();
return InitializePortAllocator_n(stun_servers, turn_servers,
configuration);
});
// Note if STUN or TURN servers were supplied.
if (!stun_servers.empty()) {
NoteUsageEvent(UsageEvent::STUN_SERVER_ADDED);
@ -553,52 +560,11 @@ RTCError PeerConnection::Initialize(
NoteUsageEvent(UsageEvent::TURN_SERVER_ADDED);
}
// Send information about IPv4/IPv6 status.
PeerConnectionAddressFamilyCounter address_family;
if (pa_result.enable_ipv6) {
address_family = kPeerConnection_IPv6;
} else {
address_family = kPeerConnection_IPv4;
}
RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family,
kPeerConnectionAddressFamilyCounter_Max);
// RFC 3264: The numeric value of the session id and version in the
// o line MUST be representable with a "64 bit signed integer".
// Due to this constraint session id |session_id_| is max limited to
// LLONG_MAX.
session_id_ = rtc::ToString(rtc::CreateRandomId64() & LLONG_MAX);
JsepTransportController::Config config;
config.redetermine_role_on_ice_restart =
configuration.redetermine_role_on_ice_restart;
config.ssl_max_version = options_.ssl_max_version;
config.disable_encryption = options_.disable_encryption;
config.bundle_policy = configuration.bundle_policy;
config.rtcp_mux_policy = configuration.rtcp_mux_policy;
// TODO(bugs.webrtc.org/9891) - Remove options_.crypto_options then remove
// this stub.
config.crypto_options = configuration.crypto_options.has_value()
? *configuration.crypto_options
: options_.crypto_options;
config.transport_observer = this;
config.rtcp_handler = InitializeRtcpCallback();
config.event_log = event_log_ptr_;
#if defined(ENABLE_EXTERNAL_AUTH)
config.enable_external_auth = true;
#endif
config.active_reset_srtp_params = configuration.active_reset_srtp_params;
if (options_.disable_encryption) {
dtls_enabled_ = false;
} else {
// Enable DTLS by default if we have an identity store or a certificate.
dtls_enabled_ =
(dependencies.cert_generator || !configuration.certificates.empty());
// |configuration| can override the default |dtls_enabled_| value.
if (configuration.enable_dtls_srtp) {
dtls_enabled_ = *(configuration.enable_dtls_srtp);
}
}
if (configuration.enable_rtp_data_channel) {
// Enable creation of RTP data channels if the kEnableRtpDataChannels is
@ -609,77 +575,27 @@ RTCError PeerConnection::Initialize(
// DTLS has to be enabled to use SCTP.
if (!options_.disable_sctp_data_channels && dtls_enabled_) {
data_channel_controller_.set_data_channel_type(cricket::DCT_SCTP);
config.sctp_factory = context_->sctp_transport_factory();
}
}
config.ice_transport_factory = ice_transport_factory_.get();
config.on_dtls_handshake_error_ =
[weak_ptr = weak_factory_.GetWeakPtr()](rtc::SSLHandshakeError s) {
if (weak_ptr) {
weak_ptr->OnTransportControllerDtlsHandshakeError(s);
}
};
transport_controller_.reset(new JsepTransportController(
signaling_thread(), network_thread(), port_allocator_.get(),
async_resolver_factory_.get(), config));
// The following RTC_DCHECKs are added by looking at the caller thread.
// If this is incorrect there might not be test failures
// due to lack of unit tests which trigger these scenarios.
// TODO(bugs.webrtc.org/12160): Remove above comments.
// callbacks for signaling_thread.
// TODO(bugs.webrtc.org/12427): If we can't piggyback on the above network
// Invoke(), then perhaps we could post these subscription calls to the
// network thread so that the transport controller doesn't have to do the
// signaling/network handling internally and use AsyncInvoker.
transport_controller_->SubscribeIceConnectionState(
[this](cricket::IceConnectionState s) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerConnectionState(s);
});
transport_controller_->SubscribeConnectionState(
[this](PeerConnectionInterface::PeerConnectionState s) {
RTC_DCHECK_RUN_ON(signaling_thread());
SetConnectionState(s);
});
transport_controller_->SubscribeStandardizedIceConnectionState(
[this](PeerConnectionInterface::IceConnectionState s) {
RTC_DCHECK_RUN_ON(signaling_thread());
SetStandardizedIceConnectionState(s);
});
transport_controller_->SubscribeIceGatheringState(
[this](cricket::IceGatheringState s) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerGatheringState(s);
});
transport_controller_->SubscribeIceCandidateGathered(
[this](const std::string& transport,
const std::vector<cricket::Candidate>& candidates) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesGathered(transport, candidates);
});
transport_controller_->SubscribeIceCandidateError(
[this](const cricket::IceCandidateErrorEvent& event) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateError(event);
});
transport_controller_->SubscribeIceCandidatesRemoved(
[this](const std::vector<cricket::Candidate>& c) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesRemoved(c);
});
transport_controller_->SubscribeIceCandidatePairChanged(
[this](const cricket::CandidatePairChangeEvent& event) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateChanged(event);
});
// Network thread initialization.
network_thread()->Invoke<void>(RTC_FROM_HERE, [this, &stun_servers,
&turn_servers, &configuration,
&dependencies] {
RTC_DCHECK_RUN_ON(network_thread());
network_thread_safety_ = PendingTaskSafetyFlag::Create();
InitializePortAllocatorResult pa_result =
InitializePortAllocator_n(stun_servers, turn_servers, configuration);
// Send information about IPv4/IPv6 status.
PeerConnectionAddressFamilyCounter address_family =
pa_result.enable_ipv6 ? kPeerConnection_IPv6 : kPeerConnection_IPv4;
RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family,
kPeerConnectionAddressFamilyCounter_Max);
InitializeTransportController_n(configuration, dependencies);
});
configuration_ = configuration;
transport_controller_->SetIceConfig(ParseIceConfig(configuration));
stats_ = std::make_unique<StatsCollector>(this);
stats_collector_ = RTCStatsCollector::Create(this);
@ -716,6 +632,125 @@ RTCError PeerConnection::Initialize(
return RTCError::OK();
}
void PeerConnection::InitializeTransportController_n(
const RTCConfiguration& configuration,
const PeerConnectionDependencies& dependencies) {
JsepTransportController::Config config;
config.redetermine_role_on_ice_restart =
configuration.redetermine_role_on_ice_restart;
config.ssl_max_version = options_.ssl_max_version;
config.disable_encryption = options_.disable_encryption;
config.bundle_policy = configuration.bundle_policy;
config.rtcp_mux_policy = configuration.rtcp_mux_policy;
// TODO(bugs.webrtc.org/9891) - Remove options_.crypto_options then remove
// this stub.
config.crypto_options = configuration.crypto_options.has_value()
? *configuration.crypto_options
: options_.crypto_options;
config.transport_observer = this;
config.rtcp_handler = InitializeRtcpCallback();
config.event_log = event_log_ptr_;
#if defined(ENABLE_EXTERNAL_AUTH)
config.enable_external_auth = true;
#endif
config.active_reset_srtp_params = configuration.active_reset_srtp_params;
// DTLS has to be enabled to use SCTP.
if (!configuration.enable_rtp_data_channel &&
!options_.disable_sctp_data_channels && dtls_enabled_) {
config.sctp_factory = context_->sctp_transport_factory();
}
config.ice_transport_factory = ice_transport_factory_.get();
config.on_dtls_handshake_error_ =
[weak_ptr = weak_factory_.GetWeakPtr()](rtc::SSLHandshakeError s) {
if (weak_ptr) {
weak_ptr->OnTransportControllerDtlsHandshakeError(s);
}
};
transport_controller_.reset(
new JsepTransportController(network_thread(), port_allocator_.get(),
async_resolver_factory_.get(), config));
transport_controller_->SubscribeIceConnectionState(
[this](cricket::IceConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerConnectionState(s);
}));
});
transport_controller_->SubscribeConnectionState(
[this](PeerConnectionInterface::PeerConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
SetConnectionState(s);
}));
});
transport_controller_->SubscribeStandardizedIceConnectionState(
[this](PeerConnectionInterface::IceConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
SetStandardizedIceConnectionState(s);
}));
});
transport_controller_->SubscribeIceGatheringState(
[this](cricket::IceGatheringState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerGatheringState(s);
}));
});
transport_controller_->SubscribeIceCandidateGathered(
[this](const std::string& transport,
const std::vector<cricket::Candidate>& candidates) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(),
[this, t = transport, c = candidates]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesGathered(t, c);
}));
});
transport_controller_->SubscribeIceCandidateError(
[this](const cricket::IceCandidateErrorEvent& event) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(ToQueuedTask(
signaling_thread_safety_.flag(), [this, event = event]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateError(event);
}));
});
transport_controller_->SubscribeIceCandidatesRemoved(
[this](const std::vector<cricket::Candidate>& c) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, c = c]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesRemoved(c);
}));
});
transport_controller_->SubscribeIceCandidatePairChanged(
[this](const cricket::CandidatePairChangeEvent& event) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(ToQueuedTask(
signaling_thread_safety_.flag(), [this, event = event]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateChanged(event);
}));
});
transport_controller_->SetIceConfig(ParseIceConfig(configuration));
}
rtc::scoped_refptr<StreamCollectionInterface> PeerConnection::local_streams() {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_CHECK(!IsUnifiedPlan()) << "local_streams is not available with Unified "
@ -1440,6 +1475,7 @@ RTCError PeerConnection::SetConfiguration(
if (configuration_.active_reset_srtp_params !=
modified_config.active_reset_srtp_params) {
// TODO(tommi): move to the network thread - this hides an invoke.
transport_controller_->SetActiveResetSrtpParams(
modified_config.active_reset_srtp_params);
}
@ -1594,6 +1630,7 @@ void PeerConnection::StopRtcEventLog() {
rtc::scoped_refptr<DtlsTransportInterface>
PeerConnection::LookupDtlsTransportByMid(const std::string& mid) {
RTC_DCHECK_RUN_ON(signaling_thread());
// TODO(tommi): Move to the network thread - this hides an invoke.
return transport_controller_->LookupDtlsTransportByMid(mid);
}
@ -1697,13 +1734,12 @@ void PeerConnection::Close() {
port_allocator_->DiscardCandidatePool();
if (network_thread_safety_) {
network_thread_safety_->SetNotAlive();
network_thread_safety_ = nullptr;
}
});
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
call_safety_.reset();
worker_thread_safety_->SetNotAlive();
call_.reset();
// The event log must outlive call (and any other object that uses it).
event_log_.reset();
@ -2144,7 +2180,10 @@ bool PeerConnection::IceRestartPending(const std::string& content_name) const {
}
bool PeerConnection::NeedsIceRestart(const std::string& content_name) const {
return transport_controller_->NeedsIceRestart(content_name);
return network_thread()->Invoke<bool>(RTC_FROM_HERE, [this, &content_name] {
RTC_DCHECK_RUN_ON(network_thread());
return transport_controller_->NeedsIceRestart(content_name);
});
}
void PeerConnection::OnTransportControllerConnectionState(
@ -2487,6 +2526,7 @@ void PeerConnection::OnTransportControllerGatheringState(
}
void PeerConnection::ReportTransportStats() {
rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
std::map<std::string, std::set<cricket::MediaType>>
media_types_by_transport_name;
for (const auto& transceiver : rtp_manager()->transceivers()->List()) {
@ -2508,18 +2548,25 @@ void PeerConnection::ReportTransportStats() {
cricket::MEDIA_TYPE_DATA);
}
for (const auto& entry : media_types_by_transport_name) {
const std::string& transport_name = entry.first;
const std::set<cricket::MediaType> media_types = entry.second;
cricket::TransportStats stats;
if (transport_controller_->GetStats(transport_name, &stats)) {
ReportBestConnectionState(stats);
ReportNegotiatedCiphers(stats, media_types);
}
}
// Run the loop that reports the state on the network thread since the
// transport controller requires the stats to be read there (GetStats()).
network_thread()->PostTask(ToQueuedTask(
network_thread_safety_, [this, media_types_by_transport_name = std::move(
media_types_by_transport_name)] {
for (const auto& entry : media_types_by_transport_name) {
const std::string& transport_name = entry.first;
const std::set<cricket::MediaType> media_types = entry.second;
cricket::TransportStats stats;
if (transport_controller_->GetStats(transport_name, &stats)) {
ReportBestConnectionState(stats);
ReportNegotiatedCiphers(dtls_enabled_, stats, media_types);
}
}
}));
}
// Walk through the ConnectionInfos to gather best connection usage
// for IPv4 and IPv6.
// static (no member state required)
void PeerConnection::ReportBestConnectionState(
const cricket::TransportStats& stats) {
for (const cricket::TransportChannelStats& channel_stats :
@ -2567,10 +2614,12 @@ void PeerConnection::ReportBestConnectionState(
}
}
// static
void PeerConnection::ReportNegotiatedCiphers(
bool dtls_enabled,
const cricket::TransportStats& stats,
const std::set<cricket::MediaType>& media_types) {
if (!dtls_enabled_ || stats.channel_stats.empty()) {
if (!dtls_enabled || stats.channel_stats.empty()) {
return;
}
@ -2721,24 +2770,9 @@ void PeerConnection::RequestUsagePatternReportForTesting() {
std::function<void(const rtc::CopyOnWriteBuffer& packet,
int64_t packet_time_us)>
PeerConnection::InitializeRtcpCallback() {
RTC_DCHECK_RUN_ON(signaling_thread());
auto flag =
worker_thread()->Invoke<rtc::scoped_refptr<PendingTaskSafetyFlag>>(
RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
if (!call_)
return rtc::scoped_refptr<PendingTaskSafetyFlag>();
if (!call_safety_)
call_safety_.reset(new ScopedTaskSafety());
return call_safety_->flag();
});
if (!flag)
return [](const rtc::CopyOnWriteBuffer&, int64_t) {};
return [this, flag = std::move(flag)](const rtc::CopyOnWriteBuffer& packet,
int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(network_thread());
return [this, flag = worker_thread_safety_](
const rtc::CopyOnWriteBuffer& packet, int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(network_thread());
// TODO(bugs.webrtc.org/11993): We should actually be delivering this call
// directly to the Call class somehow directly on the network thread and not

View File

@ -455,7 +455,8 @@ class PeerConnection : public PeerConnectionInternal,
bool is_unified_plan,
std::unique_ptr<RtcEventLog> event_log,
std::unique_ptr<Call> call,
PeerConnectionDependencies& dependencies);
PeerConnectionDependencies& dependencies,
bool dtls_enabled);
~PeerConnection() override;
@ -463,6 +464,10 @@ class PeerConnection : public PeerConnectionInternal,
RTCError Initialize(
const PeerConnectionInterface::RTCConfiguration& configuration,
PeerConnectionDependencies dependencies);
void InitializeTransportController_n(
const RTCConfiguration& configuration,
const PeerConnectionDependencies& dependencies)
RTC_RUN_ON(network_thread());
rtc::scoped_refptr<RtpTransceiverProxyWithInternal<RtpTransceiver>>
FindTransceiverBySender(rtc::scoped_refptr<RtpSenderInterface> sender)
@ -573,11 +578,12 @@ class PeerConnection : public PeerConnectionInternal,
void ReportTransportStats() RTC_RUN_ON(signaling_thread());
// Gather the usage of IPv4/IPv6 as best connection.
void ReportBestConnectionState(const cricket::TransportStats& stats);
static void ReportBestConnectionState(const cricket::TransportStats& stats);
void ReportNegotiatedCiphers(const cricket::TransportStats& stats,
const std::set<cricket::MediaType>& media_types)
RTC_RUN_ON(signaling_thread());
static void ReportNegotiatedCiphers(
bool dtls_enabled,
const cricket::TransportStats& stats,
const std::set<cricket::MediaType>& media_types);
void ReportIceCandidateCollected(const cricket::Candidate& candidate)
RTC_RUN_ON(signaling_thread());
@ -627,8 +633,9 @@ class PeerConnection : public PeerConnectionInternal,
// TODO(zstein): |async_resolver_factory_| can currently be nullptr if it
// is not injected. It should be required once chromium supplies it.
const std::unique_ptr<AsyncResolverFactory> async_resolver_factory_
RTC_GUARDED_BY(signaling_thread());
// This member variable is only used by JsepTransportController so we should
// consider moving ownership to there.
const std::unique_ptr<AsyncResolverFactory> async_resolver_factory_;
std::unique_ptr<cricket::PortAllocator>
port_allocator_; // TODO(bugs.webrtc.org/9987): Accessed on both
// signaling and network thread.
@ -646,8 +653,7 @@ class PeerConnection : public PeerConnectionInternal,
std::unique_ptr<Call> call_ RTC_GUARDED_BY(worker_thread());
ScopedTaskSafety signaling_thread_safety_;
rtc::scoped_refptr<PendingTaskSafetyFlag> network_thread_safety_;
std::unique_ptr<ScopedTaskSafety> call_safety_
RTC_GUARDED_BY(worker_thread());
rtc::scoped_refptr<PendingTaskSafetyFlag> worker_thread_safety_;
// Points to the same thing as `call_`. Since it's const, we may read the
// pointer from any thread.
@ -681,7 +687,7 @@ class PeerConnection : public PeerConnectionInternal,
std::unique_ptr<SdpOfferAnswerHandler> sdp_handler_
RTC_GUARDED_BY(signaling_thread());
bool dtls_enabled_ RTC_GUARDED_BY(signaling_thread()) = false;
const bool dtls_enabled_;
UsagePattern usage_pattern_ RTC_GUARDED_BY(signaling_thread());
bool return_histogram_very_quickly_ RTC_GUARDED_BY(signaling_thread()) =

View File

@ -2794,7 +2794,7 @@ bool SdpOfferAnswerHandler::IceRestartPending(
bool SdpOfferAnswerHandler::NeedsIceRestart(
const std::string& content_name) const {
return transport_controller()->NeedsIceRestart(content_name);
return pc_->NeedsIceRestart(content_name);
}
absl::optional<rtc::SSLRole> SdpOfferAnswerHandler::GetDtlsRole(

View File

@ -97,8 +97,7 @@ ScenarioIceConnectionImpl::ScenarioIceConnectionImpl(
port_allocator_(
new cricket::BasicPortAllocator(manager_->network_manager())),
jsep_controller_(
new JsepTransportController(signaling_thread_,
network_thread_,
new JsepTransportController(network_thread_,
port_allocator_.get(),
/*async_resolver_factory*/ nullptr,
CreateJsepConfig())) {