Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions lib/src/core/participant_collection.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import 'dart:collection';

import '../participant/participant.dart';

/// Small helper to keep participant lookups by identity and sid in sync.
class ParticipantCollection<T extends Participant> extends IterableBase<T> {
final Map<String, T> _byIdentity = {};
final Map<String, T> _bySid = {};

// Read-only views of the collections
UnmodifiableMapView<String, T> get byIdentity => UnmodifiableMapView(_byIdentity);
UnmodifiableMapView<String, T> get bySid => UnmodifiableMapView(_bySid);

// Update methods
void set(T participant) {
_byIdentity[participant.identity] = participant;
_bySid[participant.sid] = participant;
}

// Contains methods
bool containsIdentity(String identity) => _byIdentity.containsKey(identity);
bool containsSid(String sid) => _bySid.containsKey(sid);

void clear() {
_byIdentity.clear();
_bySid.clear();
}

T? removeByIdentity(String identity) {
final participant = _byIdentity.remove(identity);
if (participant == null) return null;
return _bySid.remove(participant.sid);
}

@override
Iterator<T> get iterator => _byIdentity.values.iterator;
}
84 changes: 34 additions & 50 deletions lib/src/core/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import '../types/rpc.dart';
import '../types/transcription_segment.dart';
import '../utils.dart' show unpackStreamId;
import 'engine.dart';
import 'participant_collection.dart';
import 'pending_track_queue.dart';

/// Room is the primary construct for LiveKit conferences. It contains a
Expand All @@ -72,10 +73,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
ConnectOptions get connectOptions => engine.connectOptions;
RoomOptions get roomOptions => engine.roomOptions;

/// map of identity: [[RemoteParticipant]]
UnmodifiableMapView<String, RemoteParticipant> get remoteParticipants => UnmodifiableMapView(_remoteParticipants);
final _remoteParticipants = <String, RemoteParticipant>{};
final Map<String, String> _sidToIdentity = <String, String>{};
final ParticipantCollection<RemoteParticipant> _remoteParticipants = ParticipantCollection();
UnmodifiableMapView<String, RemoteParticipant> get remoteParticipants =>
UnmodifiableMapView(_remoteParticipants.byIdentity);

/// the current participant
LocalParticipant? get localParticipant => _localParticipant;
Expand Down Expand Up @@ -373,7 +373,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
'allowed:${event.allowed}');

// find participant
final participant = _getRemoteParticipantBySid(event.participantSid);
final participant = _remoteParticipants.bySid[event.participantSid];
if (participant == null) {
return;
}
Expand Down Expand Up @@ -519,30 +519,28 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
..on<EngineFullRestartingEvent>((event) async {
events.emit(const RoomReconnectingEvent());

// clean up RemoteParticipants
final copy = _remoteParticipants.values.toList();

_remoteParticipants.clear();
_sidToIdentity.clear();
_activeSpeakers.clear();
// reset params
_name = null;
_metadata = null;
_serverVersion = null;
_serverRegion = null;

for (final participant in copy) {
final participants = _remoteParticipants.toList();
_remoteParticipants.clear();
_activeSpeakers.clear();
for (final participant in participants) {
events.emit(ParticipantDisconnectedEvent(participant: participant));
await participant.removeAllPublishedTracks(notify: false);
await participant.dispose();
}

notifyListeners();
})
..on<EngineRestartedEvent>((event) async {
// re-publish all tracks
await localParticipant?.rePublishAllTracks();

for (var participant in remoteParticipants.values) {
for (var participant in _remoteParticipants) {
for (var pub in participant.trackPublications.values) {
if (pub.subscribed) {
pub.sendUpdateTrackSettings();
Expand Down Expand Up @@ -601,7 +599,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
trackSid = streamId;
}

final participant = _getRemoteParticipantBySid(participantSid);
final participant = _remoteParticipants.bySid[participantSid];
try {
if (trackSid == null || trackSid.isEmpty) {
throw TrackSubscriptionExceptionEvent(
Expand Down Expand Up @@ -665,23 +663,15 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
if (_localParticipant?.identity == identity) {
return _localParticipant;
}
return remoteParticipants[identity];
}

RemoteParticipant? _getRemoteParticipantBySid(String sid) {
final identity = _sidToIdentity[sid];
if (identity != null) {
return remoteParticipants[identity];
}
return null;
return _remoteParticipants.byIdentity[identity];
}

Future<ParticipantCreationResult> _getOrCreateRemoteParticipant(lk_models.ParticipantInfo info) async {
if (!info.hasIdentity() || !info.hasSid()) {
throw Exception('ParticipantInfo must have identity and sid');
}

final participant = _remoteParticipants[info.identity];
final participant = _remoteParticipants.byIdentity[info.identity];
if (participant != null) {
// Return existing participant with no new publications; caller handles updates.
return ParticipantCreationResult(
Expand All @@ -695,8 +685,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
info: info,
);

_remoteParticipants[result.participant.identity] = result.participant;
_sidToIdentity[result.participant.sid] = result.participant.identity;
_remoteParticipants.set(result.participant);
await _flushPendingTracks(participant: result.participant);
return result;
}
Expand All @@ -718,7 +707,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
continue;
}

final isNew = !_remoteParticipants.containsKey(info.identity);
final isNew = !_remoteParticipants.containsIdentity(info.identity);

if (info.state == lk_models.ParticipantInfo_State.DISCONNECTED) {
hasChanged = await _handleParticipantDisconnect(info.identity);
Expand All @@ -741,12 +730,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
[result.participant.events, events].emit(event);
}
}
_sidToIdentity[info.sid] = info.identity;
_remoteParticipants.set(result.participant);
await _flushPendingTracks(participant: result.participant);
} else {
final wasUpdated = await result.participant.updateFromInfo(info);
if (wasUpdated) {
_sidToIdentity[info.sid] = info.identity;
_remoteParticipants.set(result.participant);
await _flushPendingTracks(participant: result.participant);
}
}
Expand All @@ -763,7 +752,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
};

for (final speaker in speakers) {
Participant? p = _getRemoteParticipantBySid(speaker.sid);
Participant? p = _remoteParticipants.bySid[speaker.sid];
if (speaker.sid == localParticipant?.sid) p = localParticipant;
if (p == null) continue;

Expand All @@ -786,7 +775,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
isConnected: connectionState == ConnectionState.connected,
participantSid: participant?.sid,
subscriber: (pending) async {
final target = participant ?? _getRemoteParticipantBySid(pending.participantSid);
final target = participant ?? _remoteParticipants.bySid[pending.participantSid];
if (target == null) return false;
try {
await target.addSubscribedMediaTrack(
Expand Down Expand Up @@ -816,7 +805,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
// localParticipant & remote participants
final allParticipants = <String, Participant>{
if (localParticipant != null) localParticipant!.sid: localParticipant!,
..._remoteParticipants,
..._remoteParticipants.bySid,
};

for (final speaker in speakers) {
Expand Down Expand Up @@ -847,7 +836,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
if (entry.participantSid == localParticipant?.sid) {
participant = localParticipant;
} else {
participant = _getRemoteParticipantBySid(entry.participantSid);
participant = _remoteParticipants.bySid[entry.participantSid];
}

if (participant != null) {
Expand All @@ -859,14 +848,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {

void _onSignalStreamStateUpdateEvent(List<lk_rtc.StreamStateInfo> updates) async {
for (final update in updates) {
final identity = _sidToIdentity[update.participantSid];
if (identity == null) {
logger.warning('participant not found for sid ${update.participantSid}');
// try to find RemoteParticipant
final participant = _remoteParticipants.bySid[update.participantSid];
if (participant == null) {
logger.warning('Participant not found for sid ${update.participantSid}');
continue;
}
// try to find RemoteParticipant
final participant = remoteParticipants[identity];
if (participant == null) continue;
// try to find RemoteTrackPublication
final trackPublication = participant.trackPublications[update.trackSid];
if (trackPublication == null) continue;
Expand Down Expand Up @@ -933,10 +920,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
}

Future<bool> _handleParticipantDisconnect(String identity) async {
final participant = _remoteParticipants.remove(identity);
if (participant == null) {
return false;
}
final participant = _remoteParticipants.removeByIdentity(identity);
if (participant == null) return false;

validateParticipantHasNoActiveDataStreams(identity);

Expand All @@ -952,7 +937,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
final trackSids = <String>[];
final trackSidsDisabled = <String>[];

for (var participant in remoteParticipants.values) {
for (var participant in _remoteParticipants) {
for (var track in participant.trackPublications.values) {
if (track.subscribed != autoSubscribe) {
trackSids.add(track.sid);
Expand Down Expand Up @@ -981,14 +966,13 @@ extension RoomPrivateMethods on Room {
logger.fine('[${objectId}] cleanUp()');

// clean up RemoteParticipants
final participants = _remoteParticipants.values.toList();
final participants = _remoteParticipants.toList();
_remoteParticipants.clear();
for (final participant in participants) {
await participant.removeAllPublishedTracks(notify: false);
// RemoteParticipant is responsible for disposing resources
await participant.dispose();
}
_remoteParticipants.clear();
_sidToIdentity.clear();
_pendingTrackQueue.clear();

// clean up LocalParticipant
Expand Down Expand Up @@ -1101,11 +1085,11 @@ extension RoomHardwareManagementMethods on Room {
/// Set audio output device.
Future<void> setAudioOutputDevice(MediaDevice device) async {
if (lkPlatformIs(PlatformType.web)) {
remoteParticipants.forEach((_, participant) {
for (var audioTrack in participant.audioTrackPublications) {
for (final participant in _remoteParticipants) {
for (final audioTrack in participant.audioTrackPublications) {
audioTrack.track?.setSinkId(device.deviceId);
}
});
}
Hardware.instance.selectedAudioOutput = device;
} else {
await Hardware.instance.selectAudioOutput(device);
Expand Down