diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/LiveEventListener.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/LiveEventListener.kt index 6fda65953a..65e3e94d2d 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/LiveEventListener.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/LiveEventListener.kt @@ -21,13 +21,9 @@ import org.matrix.android.sdk.api.util.JsonDict interface LiveEventListener { - fun onLiveEvent(roomId: String, event: Event) + fun onEventDecrypted(event: Event) - fun onPaginatedEvent(roomId: String, event: Event) - - fun onEventDecrypted(eventId: String, roomId: String, clearEvent: JsonDict) - - fun onEventDecryptionError(eventId: String, roomId: String, throwable: Throwable) + fun onEventDecryptionError(event: Event, throwable: Throwable) fun onLiveToDeviceEvent(event: Event) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/algorithms/megolm/MXMegolmDecryption.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/algorithms/megolm/MXMegolmDecryption.kt index e94daa0e76..22180dce86 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/algorithms/megolm/MXMegolmDecryption.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/algorithms/megolm/MXMegolmDecryption.kt @@ -113,7 +113,7 @@ internal class MXMegolmDecryption(private val userId: String, forwardingCurve25519KeyChain = olmDecryptionResult.forwardingCurve25519KeyChain .orEmpty() ).also { - liveEventManager.get().dispatchLiveEventDecrypted(event, it) + liveEventManager.get().dispatchLiveEventDecrypted(event) } } else { throw MXCryptoError.Base(MXCryptoError.ErrorType.MISSING_FIELDS, MXCryptoError.MISSING_FIELDS_REASON) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/StreamEventsManager.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/StreamEventsManager.kt index bb0ca11445..9876e265bf 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/StreamEventsManager.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/StreamEventsManager.kt @@ -42,36 +42,12 @@ internal class StreamEventsManager @Inject constructor() { listeners.remove(listener) } - fun dispatchLiveEventReceived(event: Event, roomId: String, initialSync: Boolean) { - Timber.v("## dispatchLiveEventReceived ${event.eventId}") - coroutineScope.launch { - if (!initialSync) { - listeners.forEach { - tryOrNull { - it.onLiveEvent(roomId, event) - } - } - } - } - } - - fun dispatchPaginatedEventReceived(event: Event, roomId: String) { - Timber.v("## dispatchPaginatedEventReceived ${event.eventId}") - coroutineScope.launch { - listeners.forEach { - tryOrNull { - it.onPaginatedEvent(roomId, event) - } - } - } - } - - fun dispatchLiveEventDecrypted(event: Event, result: MXEventDecryptionResult) { + fun dispatchLiveEventDecrypted(event: Event) { Timber.v("## dispatchLiveEventDecrypted ${event.eventId}") coroutineScope.launch { listeners.forEach { tryOrNull { - it.onEventDecrypted(event.eventId ?: "", event.roomId ?: "", result.clearEvent) + it.onEventDecrypted(event) } } } @@ -82,7 +58,7 @@ internal class StreamEventsManager @Inject constructor() { coroutineScope.launch { listeners.forEach { tryOrNull { - it.onEventDecryptionError(event.eventId ?: "", event.roomId ?: "", error) + it.onEventDecryptionError(event, error) } } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TokenChunkEventPersistor.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TokenChunkEventPersistor.kt index 63383a99b3..4e940bb445 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TokenChunkEventPersistor.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TokenChunkEventPersistor.kt @@ -186,7 +186,7 @@ internal class TokenChunkEventPersistor @Inject constructor( } roomMemberContentsByUser[event.stateKey] = contentToUse.toModel() } - liveEventManager.get().dispatchPaginatedEventReceived(event, roomId) + currentChunk.addTimelineEvent( roomId = roomId, eventEntity = eventEntity, diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt index 8fe85f0d31..1cb476d03a 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt @@ -382,7 +382,6 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } eventIds.add(event.eventId) - liveEventService.get().dispatchLiveEventReceived(event, roomId, insertType == EventInsertType.INITIAL_SYNC) val isInitialSync = insertType == EventInsertType.INITIAL_SYNC diff --git a/vector/src/main/java/im/vector/app/AutoRageShaker.kt b/vector/src/main/java/im/vector/app/AutoRageShaker.kt index 43283254b1..836e44f57b 100644 --- a/vector/src/main/java/im/vector/app/AutoRageShaker.kt +++ b/vector/src/main/java/im/vector/app/AutoRageShaker.kt @@ -17,23 +17,31 @@ package im.vector.app import android.content.SharedPreferences +import androidx.lifecycle.asFlow import im.vector.app.core.di.ActiveSessionHolder import im.vector.app.features.rageshake.BugReporter import im.vector.app.features.rageshake.ReportType +import im.vector.app.features.session.coroutineScope import im.vector.app.features.settings.VectorPreferences import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.cancellable import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.subscribe import kotlinx.coroutines.launch import org.matrix.android.sdk.api.session.Session import org.matrix.android.sdk.api.session.events.model.Event import org.matrix.android.sdk.api.session.events.model.toContent +import org.matrix.android.sdk.api.session.initsync.SyncStatusService +import org.matrix.android.sdk.api.session.sync.SyncState +import org.matrix.android.sdk.flow.flow import timber.log.Timber import javax.inject.Inject import javax.inject.Singleton @@ -62,10 +70,11 @@ class AutoRageShaker @Inject constructor( private val e2eDetectedFlow = MutableSharedFlow(replay = 0) private val matchingRSRequestFlow = MutableSharedFlow(replay = 0) - + var hasSynced = false + var preferenceEnabled = false fun initialize() { observeActiveSession() - enable(vectorPreferences.labsAutoReportUISI()) + preferenceEnabled = vectorPreferences.labsAutoReportUISI() // It's a singleton... vectorPreferences.subscribeToChanges(this) @@ -74,7 +83,7 @@ class AutoRageShaker @Inject constructor( e2eDetectedFlow .onEach { sendRageShake(it) - delay(2_000) + delay(60_000) } .catch { cause -> Timber.w(cause, "Failed to RS") @@ -84,7 +93,7 @@ class AutoRageShaker @Inject constructor( matchingRSRequestFlow .onEach { sendMatchingRageShake(it) - delay(2_000) + delay(60_000) } .catch { cause -> Timber.w(cause, "Failed to send matching rageshake") @@ -93,14 +102,7 @@ class AutoRageShaker @Inject constructor( } override fun onSharedPreferenceChanged(sharedPreferences: SharedPreferences?, key: String?) { - enable(vectorPreferences.labsAutoReportUISI()) - } - - var _enabled = false - fun enable(enabled: Boolean) { - if (enabled == _enabled) return - _enabled = enabled - detector.enabled = enabled + preferenceEnabled = vectorPreferences.labsAutoReportUISI() } private fun observeActiveSession() { @@ -115,7 +117,6 @@ class AutoRageShaker @Inject constructor( } fun decryptionErrorDetected(target: E2EMessageDetected) { - if (target.source == UISIEventSource.INITIAL_SYNC) return if (activeSessionHolder.getSafeActiveSession()?.sessionId != currentActiveSessionId) return val shouldSendRS = synchronized(alreadyReportedUisi) { val reportInfo = ReportInfo(target.roomId, target.sessionId) @@ -148,7 +149,6 @@ class AutoRageShaker @Inject constructor( append("\"room_id\": \"${target.roomId}\",") append("\"sender_key\": \"${target.senderKey}\",") append("\"device_id\": \"${target.senderDeviceId}\",") - append("\"source\": \"${target.source}\",") append("\"user_id\": \"${target.senderUserId}\",") append("\"session_id\": \"${target.sessionId}\"") append("}") @@ -245,6 +245,9 @@ class AutoRageShaker @Inject constructor( override val reciprocateToDeviceEventType: String get() = AUTO_RS_REQUEST + override val enabled: Boolean + get() = this@AutoRageShaker.preferenceEnabled && this@AutoRageShaker.hasSynced + override fun uisiDetected(source: E2EMessageDetected) { decryptionErrorDetected(source) } @@ -261,7 +264,14 @@ class AutoRageShaker @Inject constructor( return } this.currentActiveSessionId = sessionId - this.detector.enabled = _enabled + + hasSynced = session.hasAlreadySynced() + session.getSyncStatusLive() + .asFlow() + .onEach { + hasSynced = it !is SyncStatusService.Status.Progressing + } + .launchIn(session.coroutineScope) activeSessionIds.add(sessionId) session.addListener(this) session.addEventStreamListener(detector) diff --git a/vector/src/main/java/im/vector/app/UISIDetector.kt b/vector/src/main/java/im/vector/app/UISIDetector.kt index d6a4805e78..f7c6725bd8 100644 --- a/vector/src/main/java/im/vector/app/UISIDetector.kt +++ b/vector/src/main/java/im/vector/app/UISIDetector.kt @@ -16,6 +16,7 @@ package im.vector.app +import org.matrix.android.sdk.api.extensions.orFalse import org.matrix.android.sdk.api.session.LiveEventListener import org.matrix.android.sdk.api.session.events.model.Event import org.matrix.android.sdk.api.session.events.model.toModel @@ -26,23 +27,17 @@ import java.util.Timer import java.util.TimerTask import java.util.concurrent.Executors -enum class UISIEventSource { - INITIAL_SYNC, - INCREMENTAL_SYNC, - PAGINATION -} - data class E2EMessageDetected( val eventId: String, val roomId: String, val senderUserId: String, val senderDeviceId: String, val senderKey: String, - val sessionId: String, - val source: UISIEventSource) { + val sessionId: String + ) { companion object { - fun fromEvent(event: Event, roomId: String, source: UISIEventSource): E2EMessageDetected { + fun fromEvent(event: Event, roomId: String): E2EMessageDetected { val encryptedContent = event.content.toModel() return E2EMessageDetected( @@ -51,8 +46,7 @@ data class E2EMessageDetected( senderUserId = event.senderId ?: "", senderDeviceId = encryptedContent?.deviceId ?: "", senderKey = encryptedContent?.senderKey ?: "", - sessionId = encryptedContent?.sessionId ?: "", - source = source + sessionId = encryptedContent?.sessionId ?: "" ) } } @@ -61,6 +55,7 @@ data class E2EMessageDetected( class UISIDetector : LiveEventListener { interface UISIDetectorCallback { + val enabled: Boolean val reciprocateToDeviceEventType: String fun uisiDetected(source: E2EMessageDetected) fun uisiReciprocateRequest(source: Event) @@ -68,30 +63,16 @@ class UISIDetector : LiveEventListener { var callback: UISIDetectorCallback? = null - private val trackedEvents = mutableListOf>() + private val trackedEvents = mutableMapOf() private val executor = Executors.newSingleThreadExecutor() private val timer = Timer() private val timeoutMillis = 30_000L - var enabled = false + val enabled: Boolean get() = callback?.enabled.orFalse() - override fun onLiveEvent(roomId: String, event: Event) { - if (!enabled) return - if (!event.isEncrypted()) return - executor.execute { - handleEventReceived(E2EMessageDetected.fromEvent(event, roomId, UISIEventSource.INCREMENTAL_SYNC)) - } - } - - override fun onPaginatedEvent(roomId: String, event: Event) { - if (!enabled) return - if (!event.isEncrypted()) return - executor.execute { - handleEventReceived(E2EMessageDetected.fromEvent(event, roomId, UISIEventSource.PAGINATION)) - } - } - - override fun onEventDecrypted(eventId: String, roomId: String, clearEvent: JsonDict) { - if (!enabled) return + override fun onEventDecrypted(event: Event) { + val eventId = event.eventId + val roomId = event.roomId + if (!enabled || eventId == null || roomId == null) return executor.execute { unTrack(eventId, roomId) } @@ -104,57 +85,39 @@ class UISIDetector : LiveEventListener { } } - override fun onEventDecryptionError(eventId: String, roomId: String, throwable: Throwable) { - if (!enabled) return - executor.execute { - unTrack(eventId, roomId)?.let { - triggerUISI(it) - } -// if (throwable is MXCryptoError.OlmError) { -// if (throwable.olmException.message == "UNKNOWN_MESSAGE_INDEX") { -// unTrack(eventId, roomId)?.let { -// triggerUISI(it) -// } -// } -// } - } - } + override fun onEventDecryptionError(event: Event, throwable: Throwable) { + val eventId = event.eventId + val roomId = event.roomId + if (!enabled || eventId == null || roomId == null) return - private fun handleEventReceived(detectorEvent: E2EMessageDetected) { - if (!enabled) return - if (trackedEvents.any { it.first == detectorEvent }) { - Timber.w("## UISIDetector: Event ${detectorEvent.eventId} is already tracked") - } else { - // track it and start timer - val timeoutTask = object : TimerTask() { - override fun run() { - executor.execute { - unTrack(detectorEvent.eventId, detectorEvent.roomId) - Timber.v("## UISIDetector: Timeout on ${detectorEvent.eventId} ") - triggerUISI(detectorEvent) - } + val trackerId: String = trackerId(eventId, roomId) + if (trackedEvents.containsKey(trackerId)) { + Timber.w("## UISIDetector: Event $eventId is already tracked") + return + } + // track it and start timer + val timeoutTask = object : TimerTask() { + override fun run() { + executor.execute { + unTrack(eventId, roomId) + Timber.v("## UISIDetector: Timeout on $eventId") + triggerUISI(E2EMessageDetected.fromEvent(event, roomId)) } } - trackedEvents.add(detectorEvent to timeoutTask) - timer.schedule(timeoutTask, timeoutMillis) } + trackedEvents[trackerId] = timeoutTask + timer.schedule(timeoutTask, timeoutMillis) } + private fun trackerId(eventId: String, roomId: String): String = "$roomId-$eventId" + private fun triggerUISI(source: E2EMessageDetected) { if (!enabled) return Timber.i("## UISIDetector: Unable To Decrypt $source") callback?.uisiDetected(source) } - private fun unTrack(eventId: String, roomId: String): E2EMessageDetected? { - val index = trackedEvents.indexOfFirst { it.first.eventId == eventId && it.first.roomId == roomId } - return if (index != -1) { - trackedEvents.removeAt(index).let { - it.second.cancel() - it.first - } - } else { - null - } + private fun unTrack(eventId: String, roomId: String) { + trackedEvents.remove(trackerId(eventId, roomId))?.cancel() } }