diff --git a/vector/src/main/java/im/vector/app/features/voicebroadcast/listening/VoiceBroadcastPlayerImpl.kt b/vector/src/main/java/im/vector/app/features/voicebroadcast/listening/VoiceBroadcastPlayerImpl.kt index 168b921c2e..9afe428e59 100644 --- a/vector/src/main/java/im/vector/app/features/voicebroadcast/listening/VoiceBroadcastPlayerImpl.kt +++ b/vector/src/main/java/im/vector/app/features/voicebroadcast/listening/VoiceBroadcastPlayerImpl.kt @@ -23,53 +23,42 @@ import im.vector.app.core.di.ActiveSessionHolder import im.vector.app.features.home.room.detail.timeline.helper.AudioMessagePlaybackTracker import im.vector.app.features.voice.VoiceFailure import im.vector.app.features.voicebroadcast.getVoiceBroadcastChunk -import im.vector.app.features.voicebroadcast.getVoiceBroadcastEventId -import im.vector.app.features.voicebroadcast.isVoiceBroadcast import im.vector.app.features.voicebroadcast.listening.VoiceBroadcastPlayer.Listener import im.vector.app.features.voicebroadcast.listening.VoiceBroadcastPlayer.State +import im.vector.app.features.voicebroadcast.listening.usecase.GetLiveVoiceBroadcastChunksUseCase import im.vector.app.features.voicebroadcast.model.VoiceBroadcastState -import im.vector.app.features.voicebroadcast.model.asVoiceBroadcastEvent import im.vector.app.features.voicebroadcast.sequence import im.vector.app.features.voicebroadcast.usecase.GetVoiceBroadcastUseCase import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.withContext -import org.matrix.android.sdk.api.session.events.model.RelationType -import org.matrix.android.sdk.api.session.getRoom -import org.matrix.android.sdk.api.session.room.Room import org.matrix.android.sdk.api.session.room.model.message.MessageAudioContent import org.matrix.android.sdk.api.session.room.model.message.MessageAudioEvent -import org.matrix.android.sdk.api.session.room.model.message.asMessageAudioEvent -import org.matrix.android.sdk.api.session.room.timeline.Timeline -import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent -import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings import timber.log.Timber import java.util.concurrent.CopyOnWriteArrayList import javax.inject.Inject +import javax.inject.Singleton @Singleton class VoiceBroadcastPlayerImpl @Inject constructor( private val sessionHolder: ActiveSessionHolder, private val playbackTracker: AudioMessagePlaybackTracker, private val getVoiceBroadcastUseCase: GetVoiceBroadcastUseCase, + private val getLiveVoiceBroadcastChunksUseCase: GetLiveVoiceBroadcastChunksUseCase ) : VoiceBroadcastPlayer { + private val session get() = sessionHolder.getActiveSession() private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) private var voiceBroadcastStateJob: Job? = null - private var currentTimeline: Timeline? = null - set(value) { - field?.removeAllListeners() - field?.dispose() - field = value - } private val mediaPlayerListener = MediaPlayerListener() - private var timelineListener: TimelineListener? = null private var currentMediaPlayer: MediaPlayer? = null private var nextMediaPlayer: MediaPlayer? = null @@ -79,7 +68,10 @@ class VoiceBroadcastPlayerImpl @Inject constructor( } private var currentSequence: Int? = null + private var fetchPlaylistJob: Job? = null private var playlist = emptyList() + private var isLive: Boolean = false + override var currentVoiceBroadcastId: String? = null override var playingState = State.IDLE @@ -118,6 +110,7 @@ class VoiceBroadcastPlayerImpl @Inject constructor( // Stop playback currentMediaPlayer?.stop() currentVoiceBroadcastId?.let { playbackTracker.stopPlayback(it) } + isLive = false // Release current player release(currentMediaPlayer) @@ -131,9 +124,9 @@ class VoiceBroadcastPlayerImpl @Inject constructor( voiceBroadcastStateJob?.cancel() voiceBroadcastStateJob = null - // In case of live broadcast, stop observing new chunks - currentTimeline = null - timelineListener = null + // Do not fetch the playlist anymore + fetchPlaylistJob?.cancel() + fetchPlaylistJob = null // Update state playingState = State.IDLE @@ -141,13 +134,11 @@ class VoiceBroadcastPlayerImpl @Inject constructor( // Clear playlist playlist = emptyList() currentSequence = null + currentRoomId = null currentVoiceBroadcastId = null } - /** - * Add a [Listener] to the given voice broadcast id. - */ override fun addListener(voiceBroadcastId: String, listener: Listener) { listeners[voiceBroadcastId]?.add(listener) ?: run { listeners[voiceBroadcastId] = CopyOnWriteArrayList().apply { add(listener) } @@ -155,15 +146,11 @@ class VoiceBroadcastPlayerImpl @Inject constructor( if (voiceBroadcastId == currentVoiceBroadcastId) listener.onStateChanged(playingState) else listener.onStateChanged(State.IDLE) } - /** - * Remove a [Listener] from the given voice broadcast id. - */ override fun removeListener(voiceBroadcastId: String, listener: Listener) { listeners[voiceBroadcastId]?.remove(listener) } private fun startPlayback(roomId: String, eventId: String) { - val room = session.getRoom(roomId) ?: error("Unknown roomId: $roomId") // Stop listening previous voice broadcast if any if (playingState != State.IDLE) stop() @@ -173,16 +160,11 @@ class VoiceBroadcastPlayerImpl @Inject constructor( playingState = State.BUFFERING val voiceBroadcastState = getVoiceBroadcastUseCase.execute(roomId, eventId)?.content?.voiceBroadcastState - if (voiceBroadcastState == VoiceBroadcastState.STOPPED) { - // Get static playlist - updatePlaylist(getExistingChunks(room, eventId)) - startPlayback(false) - } else { - playLiveVoiceBroadcast(room, eventId) - } + isLive = voiceBroadcastState != null && voiceBroadcastState != VoiceBroadcastState.STOPPED + observeIncomingEvents(roomId, eventId) } - private fun startPlayback(isLive: Boolean) { + private fun startPlayback() { val event = if (isLive) playlist.lastOrNull() else playlist.firstOrNull() val content = event?.content ?: run { Timber.w("## VoiceBroadcastPlayer: No content to play"); return } val sequence = event.getVoiceBroadcastChunk()?.sequence @@ -201,24 +183,10 @@ class VoiceBroadcastPlayerImpl @Inject constructor( } } - private fun playLiveVoiceBroadcast(room: Room, eventId: String) { - room.timelineService().getTimelineEvent(eventId)?.root?.asVoiceBroadcastEvent() ?: error("Cannot retrieve voice broadcast $eventId") - updatePlaylist(getExistingChunks(room, eventId)) - startPlayback(true) - observeIncomingEvents(room, eventId) - } - - private fun getExistingChunks(room: Room, eventId: String): List { - return room.timelineService().getTimelineEventsRelatedTo(RelationType.REFERENCE, eventId) - .mapNotNull { it.root.asMessageAudioEvent() } - .filter { it.isVoiceBroadcast() } - } - - private fun observeIncomingEvents(room: Room, eventId: String) { - currentTimeline = room.timelineService().createTimeline(null, TimelineSettings(5)).also { timeline -> - timelineListener = TimelineListener(eventId).also { timeline.addListener(it) } - timeline.start() - } + private fun observeIncomingEvents(roomId: String, voiceBroadcastId: String) { + fetchPlaylistJob = getLiveVoiceBroadcastChunksUseCase.execute(roomId, voiceBroadcastId) + .onEach(this::updatePlaylist) + .launchIn(coroutineScope) } private fun resumePlayback() { @@ -229,11 +197,32 @@ class VoiceBroadcastPlayerImpl @Inject constructor( private fun updatePlaylist(playlist: List) { this.playlist = playlist.sortedBy { it.getVoiceBroadcastChunk()?.sequence?.toLong() ?: it.root.originServerTs } + onPlaylistUpdated() + } + + private fun onPlaylistUpdated() { + when (playingState) { + State.PLAYING -> { + if (nextMediaPlayer == null) { + coroutineScope.launch { nextMediaPlayer = prepareNextMediaPlayer() } + } + } + State.PAUSED -> { + if (nextMediaPlayer == null) { + coroutineScope.launch { nextMediaPlayer = prepareNextMediaPlayer() } + } + } + State.BUFFERING -> { + val newMediaContent = getNextAudioContent() + if (newMediaContent != null) startPlayback() + } + State.IDLE -> startPlayback() + } } private fun getNextAudioContent(): MessageAudioContent? { val nextSequence = currentSequence?.plus(1) - ?: timelineListener?.let { playlist.lastOrNull()?.sequence } + ?: playlist.lastOrNull()?.sequence ?: 1 return playlist.find { it.getVoiceBroadcastChunk()?.sequence == nextSequence }?.content } @@ -279,37 +268,6 @@ class VoiceBroadcastPlayerImpl @Inject constructor( } } - private inner class TimelineListener(private val voiceBroadcastId: String) : Timeline.Listener { - override fun onTimelineUpdated(snapshot: List) { - val currentSequences = playlist.map { it.sequence } - val newChunks = snapshot - .mapNotNull { timelineEvent -> - timelineEvent.root.asMessageAudioEvent() - ?.takeIf { it.isVoiceBroadcast() && it.getVoiceBroadcastEventId() == voiceBroadcastId && it.sequence !in currentSequences } - } - if (newChunks.isEmpty()) return - updatePlaylist(playlist + newChunks) - - when (playingState) { - State.PLAYING -> { - if (nextMediaPlayer == null) { - coroutineScope.launch { nextMediaPlayer = prepareNextMediaPlayer() } - } - } - State.PAUSED -> { - if (nextMediaPlayer == null) { - coroutineScope.launch { nextMediaPlayer = prepareNextMediaPlayer() } - } - } - State.BUFFERING -> { - val newMediaContent = getNextAudioContent() - if (newMediaContent != null) startPlayback(true) - } - State.IDLE -> startPlayback(true) - } - } - } - private inner class MediaPlayerListener : MediaPlayer.OnInfoListener, MediaPlayer.OnCompletionListener, MediaPlayer.OnErrorListener { override fun onInfo(mp: MediaPlayer, what: Int, extra: Int): Boolean { @@ -329,7 +287,7 @@ class VoiceBroadcastPlayerImpl @Inject constructor( val roomId = currentRoomId ?: return val voiceBroadcastId = currentVoiceBroadcastId ?: return val voiceBroadcastEventContent = getVoiceBroadcastUseCase.execute(roomId, voiceBroadcastId)?.content ?: return - val isLive = voiceBroadcastEventContent.voiceBroadcastState != null && voiceBroadcastEventContent.voiceBroadcastState != VoiceBroadcastState.STOPPED + isLive = voiceBroadcastEventContent.voiceBroadcastState != null && voiceBroadcastEventContent.voiceBroadcastState != VoiceBroadcastState.STOPPED if (!isLive && voiceBroadcastEventContent.lastChunkSequence == currentSequence) { // We'll not receive new chunks anymore so we can stop the live listening diff --git a/vector/src/main/java/im/vector/app/features/voicebroadcast/listening/usecase/GetLiveVoiceBroadcastChunksUseCase.kt b/vector/src/main/java/im/vector/app/features/voicebroadcast/listening/usecase/GetLiveVoiceBroadcastChunksUseCase.kt new file mode 100644 index 0000000000..8fbd32767d --- /dev/null +++ b/vector/src/main/java/im/vector/app/features/voicebroadcast/listening/usecase/GetLiveVoiceBroadcastChunksUseCase.kt @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2022 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package im.vector.app.features.voicebroadcast.listening.usecase + +import im.vector.app.core.di.ActiveSessionHolder +import im.vector.app.features.voicebroadcast.getVoiceBroadcastEventId +import im.vector.app.features.voicebroadcast.isVoiceBroadcast +import im.vector.app.features.voicebroadcast.model.VoiceBroadcastEvent +import im.vector.app.features.voicebroadcast.model.VoiceBroadcastState +import im.vector.app.features.voicebroadcast.model.asVoiceBroadcastEvent +import im.vector.app.features.voicebroadcast.sequence +import im.vector.app.features.voicebroadcast.usecase.GetVoiceBroadcastUseCase +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.runningReduce +import org.matrix.android.sdk.api.session.events.model.RelationType +import org.matrix.android.sdk.api.session.room.model.message.MessageAudioEvent +import org.matrix.android.sdk.api.session.room.model.message.asMessageAudioEvent +import org.matrix.android.sdk.api.session.room.timeline.Timeline +import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent +import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings +import javax.inject.Inject + +/** + * Get a [Flow] of [MessageAudioEvent]s related to the given voice broadcast. + */ +class GetLiveVoiceBroadcastChunksUseCase @Inject constructor( + private val activeSessionHolder: ActiveSessionHolder, + private val getVoiceBroadcastUseCase: GetVoiceBroadcastUseCase, +) { + + fun execute(roomId: String, voiceBroadcastId: String): Flow> { + val session = activeSessionHolder.getSafeActiveSession() ?: return emptyFlow() + val room = session.roomService().getRoom(roomId) ?: return emptyFlow() + val timeline = room.timelineService().createTimeline(null, TimelineSettings(5)) + + // Get initial chunks + val existingChunks = room.timelineService().getTimelineEventsRelatedTo(RelationType.REFERENCE, voiceBroadcastId) + .mapNotNull { timelineEvent -> timelineEvent.root.asMessageAudioEvent().takeIf { it.isVoiceBroadcast() } } + + val voiceBroadcastEvent = getVoiceBroadcastUseCase.execute(roomId, voiceBroadcastId) + val voiceBroadcastState = voiceBroadcastEvent?.content?.voiceBroadcastState + + return if (voiceBroadcastState == null || voiceBroadcastState == VoiceBroadcastState.STOPPED) { + // Just send the existing chunks if voice broadcast is stopped + flowOf(existingChunks) + } else { + // Observe new timeline events if voice broadcast is ongoing + callbackFlow { + // Init with existing chunks + send(existingChunks) + + // Observe new timeline events + val listener = object : Timeline.Listener { + private var lastEventId: String? = null + private var lastSequence: Int? = null + + override fun onTimelineUpdated(snapshot: List) { + val newEvents = lastEventId?.let { eventId -> snapshot.subList(0, snapshot.indexOfFirst { it.eventId == eventId }) } ?: snapshot + + // Detect a potential stopped voice broadcast state event + val stopEvent = newEvents.findStopEvent() + if (stopEvent != null) { + lastSequence = stopEvent.content?.lastChunkSequence + } + + val newChunks = newEvents.mapToChunkEvents(voiceBroadcastId, voiceBroadcastEvent.root.senderId) + + // Notify about new chunks + if (newChunks.isNotEmpty()) { + trySend(newChunks) + } + + // Automatically stop observing the timeline if the last chunk has been received + if (lastSequence != null && newChunks.any { it.sequence == lastSequence }) { + timeline.removeListener(this) + timeline.dispose() + } + + lastEventId = snapshot.firstOrNull()?.eventId + } + } + + timeline.addListener(listener) + timeline.start() + awaitClose { + timeline.removeListener(listener) + timeline.dispose() + } + } + .runningReduce { accumulator: List, value: List -> accumulator.plus(value) } + } + } + + /** + * Find a [VoiceBroadcastEvent] with a [VoiceBroadcastState.STOPPED] state. + */ + private fun List.findStopEvent(): VoiceBroadcastEvent? = + this.mapNotNull { it.root.asVoiceBroadcastEvent() } + .find { it.content?.voiceBroadcastState == VoiceBroadcastState.STOPPED } + + /** + * Transform the list of [TimelineEvent] to a mapped list of [MessageAudioEvent] related to a given voice broadcast. + */ + private fun List.mapToChunkEvents(voiceBroadcastId: String, senderId: String?): List = + this.mapNotNull { timelineEvent -> + timelineEvent.root.asMessageAudioEvent() + ?.takeIf { + it.isVoiceBroadcast() && it.getVoiceBroadcastEventId() == voiceBroadcastId && + it.root.senderId == senderId + } + } +}