From 9174632cfc359cf392734aa2adedbef1e8ecd7df Mon Sep 17 00:00:00 2001 From: ganfra Date: Fri, 5 Mar 2021 19:08:46 +0100 Subject: [PATCH 1/4] Send: use coroutines and let room sending queues be independent of each others --- .../sdk/internal/database/DatabaseCleaner.kt | 2 +- .../database/RealmLiveEntityObserver.kt | 4 +- .../internal/database/RealmSessionProvider.kt | 4 +- .../sdk/internal/session/DefaultSession.kt | 10 +- .../session/SessionLifecycleObserver.kt | 4 +- .../sdk/internal/session/SessionModule.kt | 10 + .../identity/DefaultIdentityService.kt | 4 +- .../integrationmanager/IntegrationManager.kt | 4 +- .../room/send/queue/EventSenderProcessor.kt | 233 +---------------- .../queue/EventSenderProcessorCoroutine.kt | 194 ++++++++++++++ .../send/queue/EventSenderProcessorThread.kt | 240 ++++++++++++++++++ .../queue/HomeServerAvailabilityChecker.kt | 45 ++++ .../session/room/send/queue/QueueMemento.kt | 31 ++- .../session/room/send/queue/QueuedTask.kt | 12 +- .../room/send/queue/RedactQueuedTask.kt | 4 +- .../room/send/queue/SendEventQueuedTask.kt | 4 +- .../session/room/send/queue/TaskInfo.kt | 4 +- .../widgets/DefaultWidgetURLFormatter.kt | 4 +- .../internal/session/widgets/WidgetManager.kt | 4 +- 19 files changed, 549 insertions(+), 268 deletions(-) create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/HomeServerAvailabilityChecker.kt diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/DatabaseCleaner.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/DatabaseCleaner.kt index 8b4ce6106b..f11ecc5d75 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/DatabaseCleaner.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/DatabaseCleaner.kt @@ -47,7 +47,7 @@ private const val MIN_NUMBER_OF_EVENTS_BY_CHUNK = 300 internal class DatabaseCleaner @Inject constructor(@SessionDatabase private val realmConfiguration: RealmConfiguration, private val taskExecutor: TaskExecutor) : SessionLifecycleObserver { - override fun onStart() { + override fun onSessionStarted() { taskExecutor.executorScope.launch(Dispatchers.Default) { awaitTransaction(realmConfiguration) { realm -> val allRooms = realm.where(RoomEntity::class.java).findAll() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmLiveEntityObserver.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmLiveEntityObserver.kt index 3e2160e666..2a0cd963b2 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmLiveEntityObserver.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmLiveEntityObserver.kt @@ -46,7 +46,7 @@ internal abstract class RealmLiveEntityObserver(protected val r private val backgroundRealm = AtomicReference() private lateinit var results: AtomicReference> - override fun onStart() { + override fun onSessionStarted() { if (isStarted.compareAndSet(false, true)) { BACKGROUND_HANDLER.post { val realm = Realm.getInstance(realmConfiguration) @@ -58,7 +58,7 @@ internal abstract class RealmLiveEntityObserver(protected val r } } - override fun onStop() { + override fun onSessionStopped() { if (isStarted.compareAndSet(true, false)) { BACKGROUND_HANDLER.post { results.getAndSet(null).removeAllChangeListeners() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmSessionProvider.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmSessionProvider.kt index 1947cc83e3..f8d5d323a5 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmSessionProvider.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmSessionProvider.kt @@ -44,14 +44,14 @@ internal class RealmSessionProvider @Inject constructor(@SessionDatabase private } @MainThread - override fun onStart() { + override fun onSessionStarted() { realmThreadLocal.getOrSet { Realm.getInstance(monarchy.realmConfiguration) } } @MainThread - override fun onStop() { + override fun onSessionStopped() { realmThreadLocal.get()?.close() realmThreadLocal.remove() } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt index 1204a9ccac..45fcc5af2d 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt @@ -65,7 +65,6 @@ import org.matrix.android.sdk.internal.di.UnauthenticatedWithCertificate import org.matrix.android.sdk.internal.di.WorkManagerProvider import org.matrix.android.sdk.internal.network.GlobalErrorHandler import org.matrix.android.sdk.internal.session.identity.DefaultIdentityService -import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor import org.matrix.android.sdk.internal.session.sync.SyncTokenStore import org.matrix.android.sdk.internal.session.sync.job.SyncThread import org.matrix.android.sdk.internal.session.sync.job.SyncWorker @@ -120,8 +119,7 @@ internal class DefaultSession @Inject constructor( private val thirdPartyService: Lazy, private val callSignalingService: Lazy, @UnauthenticatedWithCertificate - private val unauthenticatedWithCertificateOkHttpClient: Lazy, - private val eventSenderProcessor: EventSenderProcessor + private val unauthenticatedWithCertificateOkHttpClient: Lazy ) : Session, RoomService by roomService.get(), RoomDirectoryService by roomDirectoryService.get(), @@ -158,10 +156,9 @@ internal class DefaultSession @Inject constructor( isOpen = true cryptoService.get().ensureDevice() uiHandler.post { - lifecycleObservers.forEach { it.onStart() } + lifecycleObservers.forEach { it.onSessionStarted() } } globalErrorHandler.listener = this - eventSenderProcessor.start() } override fun requireBackgroundSync() { @@ -200,12 +197,11 @@ internal class DefaultSession @Inject constructor( stopSync() // timelineEventDecryptor.destroy() uiHandler.post { - lifecycleObservers.forEach { it.onStop() } + lifecycleObservers.forEach { it.onSessionStopped() } } cryptoService.get().close() isOpen = false globalErrorHandler.listener = null - eventSenderProcessor.interrupt() } override fun getSyncStateLive() = getSyncThread().liveState() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionLifecycleObserver.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionLifecycleObserver.kt index d26e9861d0..cb37fbec75 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionLifecycleObserver.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionLifecycleObserver.kt @@ -27,7 +27,7 @@ internal interface SessionLifecycleObserver { Called when the session is opened */ @MainThread - fun onStart() { + fun onSessionStarted() { // noop } @@ -43,7 +43,7 @@ internal interface SessionLifecycleObserver { Called when the session is closed */ @MainThread - fun onStop() { + fun onSessionStopped() { // noop } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt index 57c2336331..1b0a2fa027 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt @@ -83,6 +83,8 @@ import org.matrix.android.sdk.internal.session.permalinks.DefaultPermalinkServic import org.matrix.android.sdk.internal.session.room.EventRelationsAggregationProcessor import org.matrix.android.sdk.internal.session.room.create.RoomCreateEventProcessor import org.matrix.android.sdk.internal.session.room.prune.RedactionEventProcessor +import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor +import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessorCoroutine import org.matrix.android.sdk.internal.session.room.tombstone.RoomTombstoneEventProcessor import org.matrix.android.sdk.internal.session.securestorage.DefaultSecureStorageService import org.matrix.android.sdk.internal.session.typing.DefaultTypingUsersTracker @@ -339,6 +341,10 @@ internal abstract class SessionModule { @IntoSet abstract fun bindRealmSessionProvider(provider: RealmSessionProvider): SessionLifecycleObserver + @Binds + @IntoSet + abstract fun bindEventSenderProcessorAsSessionLifecycleObserver(processor: EventSenderProcessorCoroutine): SessionLifecycleObserver + @Binds abstract fun bindInitialSyncProgressService(service: DefaultInitialSyncProgressService): InitialSyncProgressService @@ -362,4 +368,8 @@ internal abstract class SessionModule { @Binds abstract fun bindRedactEventTask(task: DefaultRedactEventTask): RedactEventTask + + @Binds + abstract fun bindEventSenderProcessor(processor: EventSenderProcessorCoroutine): EventSenderProcessor + } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/identity/DefaultIdentityService.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/identity/DefaultIdentityService.kt index c6fb34151c..948e387cb1 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/identity/DefaultIdentityService.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/identity/DefaultIdentityService.kt @@ -92,7 +92,7 @@ internal class DefaultIdentityService @Inject constructor( private val listeners = mutableSetOf() - override fun onStart() { + override fun onSessionStarted() { lifecycleRegistry.currentState = Lifecycle.State.STARTED // Observe the account data change accountDataDataSource @@ -117,7 +117,7 @@ internal class DefaultIdentityService @Inject constructor( } } - override fun onStop() { + override fun onSessionStopped() { lifecycleRegistry.currentState = Lifecycle.State.DESTROYED } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/integrationmanager/IntegrationManager.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/integrationmanager/IntegrationManager.kt index 19a87103f4..e34615d269 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/integrationmanager/IntegrationManager.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/integrationmanager/IntegrationManager.kt @@ -77,7 +77,7 @@ internal class IntegrationManager @Inject constructor(matrixConfiguration: Matri currentConfigs.add(defaultConfig) } - override fun onStart() { + override fun onSessionStarted() { lifecycleRegistry.currentState = Lifecycle.State.STARTED observeWellknownConfig() accountDataDataSource @@ -105,7 +105,7 @@ internal class IntegrationManager @Inject constructor(matrixConfiguration: Matri } } - override fun onStop() { + override fun onSessionStopped() { lifecycleRegistry.currentState = Lifecycle.State.DESTROYED } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt index 62338a1d07..05d0876ef0 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt @@ -1,11 +1,11 @@ /* - * Copyright 2020 The Matrix.org Foundation C.I.C. + * Copyright (c) 2021 New Vector Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,235 +16,22 @@ package org.matrix.android.sdk.internal.session.room.send.queue -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import org.matrix.android.sdk.api.auth.data.SessionParams -import org.matrix.android.sdk.api.auth.data.sessionId -import org.matrix.android.sdk.api.extensions.tryOrNull -import org.matrix.android.sdk.api.failure.Failure -import org.matrix.android.sdk.api.failure.MatrixError -import org.matrix.android.sdk.api.failure.isTokenError -import org.matrix.android.sdk.api.session.crypto.CryptoService import org.matrix.android.sdk.api.session.events.model.Event -import org.matrix.android.sdk.api.session.sync.SyncState import org.matrix.android.sdk.api.util.Cancelable -import org.matrix.android.sdk.internal.session.SessionScope -import org.matrix.android.sdk.internal.task.TaskExecutor -import timber.log.Timber -import java.io.IOException -import java.net.InetAddress -import java.net.InetSocketAddress -import java.net.Socket -import java.util.Timer -import java.util.TimerTask -import java.util.concurrent.LinkedBlockingQueue -import javax.inject.Inject -import kotlin.concurrent.schedule +import org.matrix.android.sdk.internal.session.SessionLifecycleObserver -/** - * A simple ever running thread unique for that session responsible of sending events in order. - * Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and - * periodically test reachability before resume (does not count as a retry) - * - * If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted - */ -@SessionScope -internal class EventSenderProcessor @Inject constructor( - private val cryptoService: CryptoService, - private val sessionParams: SessionParams, - private val queuedTaskFactory: QueuedTaskFactory, - private val taskExecutor: TaskExecutor, - private val memento: QueueMemento -) : Thread("SENDER_THREAD_SID_${sessionParams.credentials.sessionId()}") { +internal interface EventSenderProcessor: SessionLifecycleObserver { - private fun markAsManaged(task: QueuedTask) { - memento.track(task) - } + fun postEvent(event: Event): Cancelable - private fun markAsFinished(task: QueuedTask) { - memento.unTrack(task) - } + fun postEvent(event: Event, encrypt: Boolean): Cancelable - // API - fun postEvent(event: Event): Cancelable { - return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false) - } + fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable - override fun start() { - super.start() - // We should check for sending events not handled because app was killed - // But we should be careful of only took those that was submitted to us, because if it's - // for example it's a media event it is handled by some worker and he will handle it - // This is a bit fragile :/ - // also some events cannot be retried manually by users, e.g reactions - // they were previously relying on workers to do the work :/ and was expected to always finally succeed - // Also some echos are not to be resent like redaction echos (fake event created for aggregation) + fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable - tryOrNull { - taskExecutor.executorScope.launch { - Timber.d("## Send relaunched pending events on restart") - memento.restoreTasks(this@EventSenderProcessor) - } - } - } + fun postTask(task: QueuedTask): Cancelable - fun postEvent(event: Event, encrypt: Boolean): Cancelable { - val task = queuedTaskFactory.createSendTask(event, encrypt) - return postTask(task) - } + fun cancel(eventId: String, roomId: String) - fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable { - return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason) - } - - fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable { - val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason) - return postTask(task) - } - - fun postTask(task: QueuedTask): Cancelable { - // non blocking add to queue - sendingQueue.add(task) - markAsManaged(task) - return task - } - - fun cancel(eventId: String, roomId: String) { - (currentTask as? SendEventQueuedTask) - ?.takeIf { it -> it.event.eventId == eventId && it.event.roomId == roomId } - ?.cancel() - } - - companion object { - private const val RETRY_WAIT_TIME_MS = 10_000L - } - - private var currentTask: QueuedTask? = null - - private var sendingQueue = LinkedBlockingQueue() - - private var networkAvailableLock = Object() - private var canReachServer = true - private var retryNoNetworkTask: TimerTask? = null - - override fun run() { - Timber.v("## SendThread started ts:${System.currentTimeMillis()}") - try { - while (!isInterrupted) { - Timber.v("## SendThread wait for task to process") - val task = sendingQueue.take() - .also { currentTask = it } - Timber.v("## SendThread Found task to process $task") - - if (task.isCancelled()) { - Timber.v("## SendThread send cancelled for $task") - // we do not execute this one - continue - } - // we check for network connectivity - while (!canReachServer) { - Timber.v("## SendThread cannot reach server, wait ts:${System.currentTimeMillis()}") - // schedule to retry - waitForNetwork() - // if thread as been killed meanwhile -// if (state == State.KILLING) break - } - Timber.v("## Server is Reachable") - // so network is available - - runBlocking { - retryLoop@ while (task.retryCount < 3) { - try { - // SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER) - Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}") - task.execute() - // sendEventTask.execute(SendEventTask.Params(task.event, task.encrypt, cryptoService)) - // SendPerformanceProfiler.stopStage(task.event.eventId, SendPerformanceProfiler.Stages.SEND_WORKER) - break@retryLoop - } catch (exception: Throwable) { - when { - exception is IOException || exception is Failure.NetworkConnection -> { - canReachServer = false - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() - while (!canReachServer) { - Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}") - // schedule to retry - waitForNetwork() - } - } - (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() - Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}") - // wait a bit - // Todo if its a quota exception can we get timout? - sleep(3_000) - continue@retryLoop - } - exception.isTokenError() -> { - Timber.v("## SendThread retryLoop retryable TOKEN error, interrupt") - // we can exit the loop - task.onTaskFailed() - throw InterruptedException() - } - exception is CancellationException -> { - Timber.v("## SendThread task has been cancelled") - break@retryLoop - } - else -> { - Timber.v("## SendThread retryLoop Un-Retryable error, try next task") - // this task is in error, check next one? - task.onTaskFailed() - break@retryLoop - } - } - } - } - } - markAsFinished(task) - } - } catch (interruptionException: InterruptedException) { - // will be thrown is thread is interrupted while seeping - interrupt() - Timber.v("## InterruptedException!! ${interruptionException.localizedMessage}") - } -// state = State.KILLED - // is this needed? - retryNoNetworkTask?.cancel() - Timber.w("## SendThread finished ${System.currentTimeMillis()}") - } - - private fun waitForNetwork() { - retryNoNetworkTask = Timer(SyncState.NoNetwork.toString(), false).schedule(RETRY_WAIT_TIME_MS) { - synchronized(networkAvailableLock) { - canReachServer = checkHostAvailable().also { - Timber.v("## SendThread checkHostAvailable $it") - } - networkAvailableLock.notify() - } - } - synchronized(networkAvailableLock) { networkAvailableLock.wait() } - } - - /** - * Check if homeserver is reachable. - */ - private fun checkHostAvailable(): Boolean { - val host = sessionParams.homeServerConnectionConfig.homeServerUri.host ?: return false - val port = sessionParams.homeServerConnectionConfig.homeServerUri.port.takeIf { it != -1 } ?: 80 - val timeout = 30_000 - try { - Socket().use { socket -> - val inetAddress: InetAddress = InetAddress.getByName(host) - val inetSocketAddress = InetSocketAddress(inetAddress, port) - socket.connect(inetSocketAddress, timeout) - return true - } - } catch (e: IOException) { - Timber.v("## EventSender isHostAvailable failure ${e.localizedMessage}") - return false - } - } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt new file mode 100644 index 0000000000..77e156f90a --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2021 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import org.matrix.android.sdk.api.auth.data.SessionParams +import org.matrix.android.sdk.api.failure.Failure +import org.matrix.android.sdk.api.failure.MatrixError +import org.matrix.android.sdk.api.session.crypto.CryptoService +import org.matrix.android.sdk.api.session.events.model.Event +import org.matrix.android.sdk.api.util.Cancelable +import org.matrix.android.sdk.internal.session.SessionScope +import org.matrix.android.sdk.internal.task.CoroutineSequencer +import org.matrix.android.sdk.internal.task.SemaphoreCoroutineSequencer +import org.matrix.android.sdk.internal.task.TaskExecutor +import org.matrix.android.sdk.internal.util.toCancelable +import timber.log.Timber +import java.io.IOException +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean +import javax.inject.Inject +import kotlin.coroutines.cancellation.CancellationException + +private const val RETRY_WAIT_TIME_MS = 10_000L + +/** + * This class is responsible for sending events in order in each room. It uses the QueuedTask.queueIdentifier to execute tasks sequentially. + * Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and + * periodically test reachability before resume (does not count as a retry) + * + * If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted + * + */ +@SessionScope +internal class EventSenderProcessorCoroutine @Inject constructor( + private val cryptoService: CryptoService, + private val sessionParams: SessionParams, + private val queuedTaskFactory: QueuedTaskFactory, + private val taskExecutor: TaskExecutor, + private val memento: QueueMemento, +) : EventSenderProcessor { + + /** + * sequencers use QueuedTask.queueIdentifier as key + */ + private val sequencers = ConcurrentHashMap() + + /** + * cancelableBag use QueuedTask.taskIdentifier as key + */ + private val cancelableBag = ConcurrentHashMap() + + override fun onSessionStarted() { + // We should check for sending events not handled because app was killed + // But we should be careful of only took those that was submitted to us, because if it's + // for example it's a media event it is handled by some worker and he will handle it + // This is a bit fragile :/ + // also some events cannot be retried manually by users, e.g reactions + // they were previously relying on workers to do the work :/ and was expected to always finally succeed + // Also some echos are not to be resent like redaction echos (fake event created for aggregation) + taskExecutor.executorScope.launch { + Timber.d("## Send relaunched pending events on restart") + try { + memento.restoreTasks(this@EventSenderProcessorCoroutine) + } catch (failure: Throwable) { + Timber.e(failure, "Fail restoring send tasks") + } + } + } + + override fun postEvent(event: Event): Cancelable { + return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false) + } + + override fun postEvent(event: Event, encrypt: Boolean): Cancelable { + val task = queuedTaskFactory.createSendTask(event, encrypt) + return postTask(task) + } + + override fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable { + return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason) + } + + override fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable { + val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason) + return postTask(task) + } + + override fun postTask(task: QueuedTask): Cancelable { + markAsManaged(task) + val sequencer = sequencers.getOrPut(task.queueIdentifier) { + SemaphoreCoroutineSequencer() + } + return taskExecutor.executorScope + .launchWith(sequencer) { + executeTask(task) + }.toCancelable() + .also { + cancelableBag[task.taskIdentifier] + } + } + + override fun cancel(eventId: String, roomId: String) { + // eventId is the taskIdentifier + cancelableBag[eventId]?.cancel() + } + + private fun CoroutineScope.launchWith(sequencer: CoroutineSequencer, block: suspend CoroutineScope.() -> Unit) = launch { + sequencer.post { + block() + } + } + + private suspend fun executeTask(task: QueuedTask) { + try { + if (task.isCancelled()) { + Timber.v("## task has been cancelled") + return + } + waitForNetwork() + Timber.v("## execute task with id: ${task.queueIdentifier}") + task.execute() + } catch (exception: Throwable) { + when { + exception is IOException || exception is Failure.NetworkConnection -> { + canReachServer.set(false) + task.retryCount++ + if (task.retryCount >= 3) task.onTaskFailed() + Timber.v("## retryable error for $task reason: ${exception.localizedMessage}") + // Retry + executeTask(task) + } + (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { + task.retryCount++ + if (task.retryCount >= 3) task.onTaskFailed() + Timber.v("## retryable error for $task reason: ${exception.localizedMessage}") + // Wait some time + delay(exception.error.retryAfterMillis?.plus(100) ?: 3_000) + // and retry + executeTask(task) + } + exception is CancellationException -> { + Timber.v("## task has been cancelled") + } + else -> { + Timber.v("## Un-Retryable error, try next task") + // this task is in error, check next one? + task.onTaskFailed() + } + } + } + markAsFinished(task) + } + + private fun markAsManaged(task: QueuedTask) { + memento.track(task) + } + + private fun markAsFinished(task: QueuedTask) { + cancelableBag.remove(task.taskIdentifier) + memento.unTrack(task) + } + + private val canReachServer = AtomicBoolean(true) + + private suspend fun waitForNetwork() { + while (!canReachServer.get()) { + Timber.v("## Cannot reach server, wait ts:${System.currentTimeMillis()}") + delay(RETRY_WAIT_TIME_MS) + withContext(Dispatchers.IO) { + val hostAvailable = HomeServerAvailabilityChecker(sessionParams).check() + canReachServer.set(hostAvailable) + } + } + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt new file mode 100644 index 0000000000..c3a9eb4a07 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt @@ -0,0 +1,240 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.matrix.android.sdk.api.auth.data.SessionParams +import org.matrix.android.sdk.api.auth.data.sessionId +import org.matrix.android.sdk.api.extensions.tryOrNull +import org.matrix.android.sdk.api.failure.Failure +import org.matrix.android.sdk.api.failure.MatrixError +import org.matrix.android.sdk.api.failure.isTokenError +import org.matrix.android.sdk.api.session.crypto.CryptoService +import org.matrix.android.sdk.api.session.events.model.Event +import org.matrix.android.sdk.api.session.sync.SyncState +import org.matrix.android.sdk.api.util.Cancelable +import org.matrix.android.sdk.internal.session.SessionScope +import org.matrix.android.sdk.internal.task.TaskExecutor +import timber.log.Timber +import java.io.IOException +import java.net.InetAddress +import java.net.InetSocketAddress +import java.net.Socket +import java.util.Timer +import java.util.TimerTask +import java.util.concurrent.LinkedBlockingQueue +import javax.inject.Inject +import kotlin.concurrent.schedule + +/** + * A simple ever running thread unique for that session responsible of sending events in order. + * Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and + * periodically test reachability before resume (does not count as a retry) + * + * If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted + */ +@SessionScope +internal class EventSenderProcessorThread @Inject constructor( + private val cryptoService: CryptoService, + private val sessionParams: SessionParams, + private val queuedTaskFactory: QueuedTaskFactory, + private val taskExecutor: TaskExecutor, + private val memento: QueueMemento +) : Thread("SENDER_THREAD_SID_${sessionParams.credentials.sessionId()}"), EventSenderProcessor { + + private fun markAsManaged(task: QueuedTask) { + memento.track(task) + } + + private fun markAsFinished(task: QueuedTask) { + memento.unTrack(task) + } + + override fun onSessionStarted() { + start() + } + + override fun onSessionStopped() { + interrupt() + } + + + override fun start() { + super.start() + // We should check for sending events not handled because app was killed + // But we should be careful of only took those that was submitted to us, because if it's + // for example it's a media event it is handled by some worker and he will handle it + // This is a bit fragile :/ + // also some events cannot be retried manually by users, e.g reactions + // they were previously relying on workers to do the work :/ and was expected to always finally succeed + // Also some echos are not to be resent like redaction echos (fake event created for aggregation) + + tryOrNull { + taskExecutor.executorScope.launch { + Timber.d("## Send relaunched pending events on restart") + memento.restoreTasks(this@EventSenderProcessorThread) + } + } + } + + // API + override fun postEvent(event: Event): Cancelable { + return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false) + } + + override fun postEvent(event: Event, encrypt: Boolean): Cancelable { + val task = queuedTaskFactory.createSendTask(event, encrypt) + return postTask(task) + } + + override fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable { + return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason) + } + + override fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable { + val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason) + return postTask(task) + } + + override fun postTask(task: QueuedTask): Cancelable { + // non blocking add to queue + sendingQueue.add(task) + markAsManaged(task) + return task + } + + override fun cancel(eventId: String, roomId: String) { + (currentTask as? SendEventQueuedTask) + ?.takeIf { it -> it.event.eventId == eventId && it.event.roomId == roomId } + ?.cancel() + } + + companion object { + private const val RETRY_WAIT_TIME_MS = 10_000L + } + + private var currentTask: QueuedTask? = null + + private var sendingQueue = LinkedBlockingQueue() + + private var networkAvailableLock = Object() + private var canReachServer = true + private var retryNoNetworkTask: TimerTask? = null + + override fun run() { + Timber.v("## SendThread started ts:${System.currentTimeMillis()}") + try { + while (!isInterrupted) { + Timber.v("## SendThread wait for task to process") + val task = sendingQueue.take() + .also { currentTask = it } + Timber.v("## SendThread Found task to process $task") + + if (task.isCancelled()) { + Timber.v("## SendThread send cancelled for $task") + // we do not execute this one + continue + } + // we check for network connectivity + while (!canReachServer) { + Timber.v("## SendThread cannot reach server, wait ts:${System.currentTimeMillis()}") + // schedule to retry + waitForNetwork() + // if thread as been killed meanwhile +// if (state == State.KILLING) break + } + Timber.v("## Server is Reachable") + // so network is available + + runBlocking { + retryLoop@ while (task.retryCount < 3) { + try { + // SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER) + Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}") + task.execute() + // sendEventTask.execute(SendEventTask.Params(task.event, task.encrypt, cryptoService)) + // SendPerformanceProfiler.stopStage(task.event.eventId, SendPerformanceProfiler.Stages.SEND_WORKER) + break@retryLoop + } catch (exception: Throwable) { + when { + exception is IOException || exception is Failure.NetworkConnection -> { + canReachServer = false + task.retryCount++ + if (task.retryCount >= 3) task.onTaskFailed() + while (!canReachServer) { + Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}") + // schedule to retry + waitForNetwork() + } + } + (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { + task.retryCount++ + if (task.retryCount >= 3) task.onTaskFailed() + Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}") + // wait a bit + // Todo if its a quota exception can we get timout? + sleep(3_000) + continue@retryLoop + } + exception.isTokenError() -> { + Timber.v("## SendThread retryLoop retryable TOKEN error, interrupt") + // we can exit the loop + task.onTaskFailed() + throw InterruptedException() + } + exception is CancellationException -> { + Timber.v("## SendThread task has been cancelled") + break@retryLoop + } + else -> { + Timber.v("## SendThread retryLoop Un-Retryable error, try next task") + // this task is in error, check next one? + task.onTaskFailed() + break@retryLoop + } + } + } + } + } + markAsFinished(task) + } + } catch (interruptionException: InterruptedException) { + // will be thrown is thread is interrupted while seeping + interrupt() + Timber.v("## InterruptedException!! ${interruptionException.localizedMessage}") + } +// state = State.KILLED + // is this needed? + retryNoNetworkTask?.cancel() + Timber.w("## SendThread finished ${System.currentTimeMillis()}") + } + + private fun waitForNetwork() { + retryNoNetworkTask = Timer(SyncState.NoNetwork.toString(), false).schedule(RETRY_WAIT_TIME_MS) { + synchronized(networkAvailableLock) { + canReachServer = HomeServerAvailabilityChecker(sessionParams).check().also { + Timber.v("## SendThread checkHostAvailable $it") + } + networkAvailableLock.notify() + } + } + synchronized(networkAvailableLock) { networkAvailableLock.wait() } + } + +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/HomeServerAvailabilityChecker.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/HomeServerAvailabilityChecker.kt new file mode 100644 index 0000000000..c68be74a64 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/HomeServerAvailabilityChecker.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import org.matrix.android.sdk.api.auth.data.SessionParams +import timber.log.Timber +import java.io.IOException +import java.net.InetAddress +import java.net.InetSocketAddress +import java.net.Socket + +internal class HomeServerAvailabilityChecker(val sessionParams: SessionParams) { + + fun check(): Boolean { + val host = sessionParams.homeServerConnectionConfig.homeServerUri.host ?: return false + val port = sessionParams.homeServerConnectionConfig.homeServerUri.port.takeIf { it != -1 } ?: 80 + val timeout = 30_000 + try { + Socket().use { socket -> + val inetAddress: InetAddress = InetAddress.getByName(host) + val inetSocketAddress = InetSocketAddress(inetAddress, port) + socket.connect(inetSocketAddress, timeout) + return true + } + } catch (e: IOException) { + Timber.v("## EventSender isHostAvailable failure ${e.localizedMessage}") + return false + } + } + +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt index a6836c8086..472e4d440f 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt @@ -32,6 +32,9 @@ import javax.inject.Inject * It is just used to remember what events/localEchos was managed by the event sender in order to * reschedule them (and only them) on next restart */ + +private const val PERSISTENCE_KEY = "ManagedBySender" + internal class QueueMemento @Inject constructor(context: Context, @SessionId sessionId: String, private val queuedTaskFactory: QueuedTaskFactory, @@ -39,28 +42,28 @@ internal class QueueMemento @Inject constructor(context: Context, private val cryptoService: CryptoService) { private val storage = context.getSharedPreferences("QueueMemento_$sessionId", Context.MODE_PRIVATE) - private val managedTaskInfos = mutableListOf() + private val trackedTasks = mutableListOf() - fun track(task: QueuedTask) { - synchronized(managedTaskInfos) { - managedTaskInfos.add(task) - persist() - } + fun track(task: QueuedTask) = synchronized(trackedTasks) { + trackedTasks.add(task) + persist() } - fun unTrack(task: QueuedTask) { - synchronized(managedTaskInfos) { - managedTaskInfos.remove(task) - persist() - } + fun unTrack(task: QueuedTask) = synchronized(trackedTasks) { + trackedTasks.remove(task) + persist() + } + + fun trackedTasks() = synchronized(trackedTasks){ + } private fun persist() { - managedTaskInfos.mapIndexedNotNull { index, queuedTask -> + trackedTasks.mapIndexedNotNull { index, queuedTask -> toTaskInfo(queuedTask, index)?.let { TaskInfo.map(it) } }.toSet().let { set -> storage.edit() - .putStringSet("ManagedBySender", set) + .putStringSet(PERSISTENCE_KEY, set) .apply() } } @@ -82,7 +85,7 @@ internal class QueueMemento @Inject constructor(context: Context, suspend fun restoreTasks(eventProcessor: EventSenderProcessor) { // events should be restarted in correct order - storage.getStringSet("ManagedBySender", null)?.let { pending -> + storage.getStringSet(PERSISTENCE_KEY, null)?.let { pending -> Timber.d("## Send - Recovering unsent events $pending") pending.mapNotNull { tryOrNull { TaskInfo.map(it) } } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt index 9a7fcd8d91..a4eb9e9323 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt @@ -18,7 +18,17 @@ package org.matrix.android.sdk.internal.session.room.send.queue import org.matrix.android.sdk.api.util.Cancelable -abstract class QueuedTask : Cancelable { +/** + * @param queueIdentifier String value to identify a unique Queue + * @param taskIdentifier String value to identify a unique Task. Should be different from queueIdentifier + */ +internal abstract class QueuedTask( + val queueIdentifier: String, + val taskIdentifier: String +) : Cancelable { + + override fun toString() = "queueIdentifier: $queueIdentifier, taskIdentifier: ${taskIdentifier})" + var retryCount = 0 private var hasBeenCancelled: Boolean = false diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/RedactQueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/RedactQueuedTask.kt index 8e7ba2f155..0e3d88aa79 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/RedactQueuedTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/RedactQueuedTask.kt @@ -29,9 +29,7 @@ internal class RedactQueuedTask( private val redactEventTask: RedactEventTask, private val localEchoRepository: LocalEchoRepository, private val cancelSendTracker: CancelSendTracker -) : QueuedTask() { - - override fun toString() = "[RedactQueuedTask $redactionLocalEchoId]" +) : QueuedTask(queueIdentifier = roomId, taskIdentifier = redactionLocalEchoId) { override suspend fun doExecute() { redactEventTask.execute(RedactEventTask.Params(redactionLocalEchoId, roomId, toRedactEventId, reason)) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/SendEventQueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/SendEventQueuedTask.kt index ea097082c7..49492e7990 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/SendEventQueuedTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/SendEventQueuedTask.kt @@ -31,9 +31,7 @@ internal class SendEventQueuedTask( val cryptoService: CryptoService, val localEchoRepository: LocalEchoRepository, val cancelSendTracker: CancelSendTracker -) : QueuedTask() { - - override fun toString() = "[SendEventQueuedTask ${event.eventId}]" +) : QueuedTask(queueIdentifier = event.roomId!!, taskIdentifier = event.eventId!!) { override suspend fun doExecute() { sendEventTask.execute(SendEventTask.Params(event, encrypt)) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/TaskInfo.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/TaskInfo.kt index 87c6299c4d..a03ab778f6 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/TaskInfo.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/TaskInfo.kt @@ -36,7 +36,7 @@ internal interface TaskInfo { const val TYPE_SEND = "TYPE_SEND" const val TYPE_REDACT = "TYPE_REDACT" - val moshi = Moshi.Builder() + private val moshi = Moshi.Builder() .add(RuntimeJsonAdapterFactory.of(TaskInfo::class.java, "type", FallbackTaskInfo::class.java) .registerSubtype(SendEventTaskInfo::class.java, TYPE_SEND) .registerSubtype(RedactEventTaskInfo::class.java, TYPE_REDACT) @@ -71,6 +71,6 @@ internal data class RedactEventTaskInfo( @JsonClass(generateAdapter = true) internal data class FallbackTaskInfo( - @Json(name = "type") override val type: String = TaskInfo.TYPE_REDACT, + @Json(name = "type") override val type: String = TaskInfo.TYPE_UNKNOWN, @Json(name = "order") override val order: Int ) : TaskInfo diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/DefaultWidgetURLFormatter.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/DefaultWidgetURLFormatter.kt index db74e76b31..0937f6d18b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/DefaultWidgetURLFormatter.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/DefaultWidgetURLFormatter.kt @@ -37,12 +37,12 @@ internal class DefaultWidgetURLFormatter @Inject constructor(private val integra private lateinit var currentConfig: IntegrationManagerConfig private var whiteListedUrls: List = emptyList() - override fun onStart() { + override fun onSessionStarted() { setupWithConfiguration() integrationManager.addListener(this) } - override fun onStop() { + override fun onSessionStopped() { integrationManager.removeListener(this) } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/WidgetManager.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/WidgetManager.kt index f841a2a245..73a4cc697d 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/WidgetManager.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/WidgetManager.kt @@ -62,12 +62,12 @@ internal class WidgetManager @Inject constructor(private val integrationManager: private val lifecycleOwner: LifecycleOwner = LifecycleOwner { lifecycleRegistry } private val lifecycleRegistry: LifecycleRegistry = LifecycleRegistry(lifecycleOwner) - override fun onStart() { + override fun onSessionStarted() { lifecycleRegistry.currentState = Lifecycle.State.STARTED integrationManager.addListener(this) } - override fun onStop() { + override fun onSessionStopped() { integrationManager.removeListener(this) lifecycleRegistry.currentState = Lifecycle.State.DESTROYED } From a0df20fcd2dcdbfd986298b75b027f4d690179ae Mon Sep 17 00:00:00 2001 From: ganfra Date: Fri, 5 Mar 2021 21:03:52 +0100 Subject: [PATCH 2/4] Send: clean code and add more logs --- .../queue/EventSenderProcessorCoroutine.kt | 47 +++++++++++-------- .../send/queue/EventSenderProcessorThread.kt | 8 ++-- .../session/room/send/queue/QueuedTask.kt | 8 +++- 3 files changed, 36 insertions(+), 27 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt index 77e156f90a..56fb3b8539 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt @@ -34,12 +34,14 @@ import org.matrix.android.sdk.internal.task.TaskExecutor import org.matrix.android.sdk.internal.util.toCancelable import timber.log.Timber import java.io.IOException +import java.util.Queue import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean import javax.inject.Inject import kotlin.coroutines.cancellation.CancellationException private const val RETRY_WAIT_TIME_MS = 10_000L +private const val MAX_RETRY_COUNT = 3 /** * This class is responsible for sending events in order in each room. It uses the QueuedTask.queueIdentifier to execute tasks sequentially. @@ -58,6 +60,8 @@ internal class EventSenderProcessorCoroutine @Inject constructor( private val memento: QueueMemento, ) : EventSenderProcessor { + private val waitForNetworkSequencer = SemaphoreCoroutineSequencer() + /** * sequencers use QueuedTask.queueIdentifier as key */ @@ -109,6 +113,7 @@ internal class EventSenderProcessorCoroutine @Inject constructor( val sequencer = sequencers.getOrPut(task.queueIdentifier) { SemaphoreCoroutineSequencer() } + Timber.v("## post $task") return taskExecutor.executorScope .launchWith(sequencer) { executeTask(task) @@ -119,7 +124,7 @@ internal class EventSenderProcessorCoroutine @Inject constructor( } override fun cancel(eventId: String, roomId: String) { - // eventId is the taskIdentifier + // eventId is most likely the taskIdentifier cancelableBag[eventId]?.cancel() } @@ -132,36 +137,26 @@ internal class EventSenderProcessorCoroutine @Inject constructor( private suspend fun executeTask(task: QueuedTask) { try { if (task.isCancelled()) { - Timber.v("## task has been cancelled") + Timber.v("## $task has been cancelled, try next task") return } - waitForNetwork() - Timber.v("## execute task with id: ${task.queueIdentifier}") + task.waitForNetwork() task.execute() } catch (exception: Throwable) { when { exception is IOException || exception is Failure.NetworkConnection -> { canReachServer.set(false) - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() - Timber.v("## retryable error for $task reason: ${exception.localizedMessage}") - // Retry - executeTask(task) + task.markAsFailedOrRetry(exception, 0) } (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() - Timber.v("## retryable error for $task reason: ${exception.localizedMessage}") - // Wait some time - delay(exception.error.retryAfterMillis?.plus(100) ?: 3_000) - // and retry - executeTask(task) + val delay = exception.error.retryAfterMillis?.plus(100) ?: 3_000 + task.markAsFailedOrRetry(exception, delay) } exception is CancellationException -> { - Timber.v("## task has been cancelled") + Timber.v("## $task has been cancelled, try next task") } else -> { - Timber.v("## Un-Retryable error, try next task") + Timber.v("## un-retryable error for $task, try next task") // this task is in error, check next one? task.onTaskFailed() } @@ -170,6 +165,18 @@ internal class EventSenderProcessorCoroutine @Inject constructor( markAsFinished(task) } + private suspend fun QueuedTask.markAsFailedOrRetry(failure: Throwable, retryDelay: Long){ + if (retryCount.incrementAndGet() >= MAX_RETRY_COUNT) { + onTaskFailed() + } else { + Timber.v("## retryable error for $this reason: ${failure.localizedMessage}") + // Wait if necessary + delay(retryDelay) + // And then retry + executeTask(this) + } + } + private fun markAsManaged(task: QueuedTask) { memento.track(task) } @@ -181,9 +188,9 @@ internal class EventSenderProcessorCoroutine @Inject constructor( private val canReachServer = AtomicBoolean(true) - private suspend fun waitForNetwork() { + private suspend fun QueuedTask.waitForNetwork() = waitForNetworkSequencer.post { while (!canReachServer.get()) { - Timber.v("## Cannot reach server, wait ts:${System.currentTimeMillis()}") + Timber.v("## $this cannot reach server wait ts:${System.currentTimeMillis()}") delay(RETRY_WAIT_TIME_MS) withContext(Dispatchers.IO) { val hostAvailable = HomeServerAvailabilityChecker(sessionParams).check() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt index c3a9eb4a07..2e1acf710c 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt @@ -163,7 +163,7 @@ internal class EventSenderProcessorThread @Inject constructor( // so network is available runBlocking { - retryLoop@ while (task.retryCount < 3) { + retryLoop@ while (task.retryCount.get() < 3) { try { // SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER) Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}") @@ -175,8 +175,7 @@ internal class EventSenderProcessorThread @Inject constructor( when { exception is IOException || exception is Failure.NetworkConnection -> { canReachServer = false - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() + if (task.retryCount.getAndIncrement() >= 3) task.onTaskFailed() while (!canReachServer) { Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}") // schedule to retry @@ -184,8 +183,7 @@ internal class EventSenderProcessorThread @Inject constructor( } } (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() + if (task.retryCount.getAndIncrement() >= 3) task.onTaskFailed() Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}") // wait a bit // Todo if its a quota exception can we get timout? diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt index a4eb9e9323..e5302c171c 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt @@ -17,6 +17,8 @@ package org.matrix.android.sdk.internal.session.room.send.queue import org.matrix.android.sdk.api.util.Cancelable +import timber.log.Timber +import java.util.concurrent.atomic.AtomicInteger /** * @param queueIdentifier String value to identify a unique Queue @@ -27,15 +29,17 @@ internal abstract class QueuedTask( val taskIdentifier: String ) : Cancelable { - override fun toString() = "queueIdentifier: $queueIdentifier, taskIdentifier: ${taskIdentifier})" + override fun toString() = "${javaClass.simpleName} queueIdentifier: $queueIdentifier, taskIdentifier: ${taskIdentifier})" - var retryCount = 0 + var retryCount = AtomicInteger(0) private var hasBeenCancelled: Boolean = false suspend fun execute() { if (!isCancelled()) { + Timber.v("Execute: $this start") doExecute() + Timber.v("Execute: $this finish") } } From 3c7a108940376ff68dc88494563ce3fdc4dca1c1 Mon Sep 17 00:00:00 2001 From: ganfra Date: Tue, 9 Mar 2021 18:18:28 +0100 Subject: [PATCH 3/4] Send: clean and update CHANGES --- CHANGES.md | 1 + .../matrix/android/sdk/internal/session/SessionModule.kt | 1 - .../session/room/send/queue/EventSenderProcessor.kt | 3 +-- .../room/send/queue/EventSenderProcessorCoroutine.kt | 7 +++---- .../session/room/send/queue/EventSenderProcessorThread.kt | 5 ----- .../room/send/queue/HomeServerAvailabilityChecker.kt | 3 +-- .../sdk/internal/session/room/send/queue/QueueMemento.kt | 3 +-- .../sdk/internal/session/room/send/queue/QueuedTask.kt | 2 +- 8 files changed, 8 insertions(+), 17 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ea78fcaf7d..8136abeffd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,7 @@ Improvements 🙌: - PIP support for Jitsi call (#2418) - Add tooltip for room quick actions - Pre-share session keys when opening a room or start typing (#2771) + - Sending is now queuing by room and not uniquely to the session Bugfix 🐛: - Try to fix crash about UrlPreview (#2640) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt index 1b0a2fa027..f10eb67921 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt @@ -371,5 +371,4 @@ internal abstract class SessionModule { @Binds abstract fun bindEventSenderProcessor(processor: EventSenderProcessorCoroutine): EventSenderProcessor - } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt index 05d0876ef0..8bafa5f882 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 New Vector Ltd + * Copyright 2021 The Matrix.org Foundation C.I.C. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,5 +33,4 @@ internal interface EventSenderProcessor: SessionLifecycleObserver { fun postTask(task: QueuedTask): Cancelable fun cancel(eventId: String, roomId: String) - } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt index 56fb3b8539..e93ebe9048 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 New Vector Ltd + * Copyright 2021 The Matrix.org Foundation C.I.C. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,6 @@ import org.matrix.android.sdk.internal.task.TaskExecutor import org.matrix.android.sdk.internal.util.toCancelable import timber.log.Timber import java.io.IOException -import java.util.Queue import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean import javax.inject.Inject @@ -57,7 +56,7 @@ internal class EventSenderProcessorCoroutine @Inject constructor( private val sessionParams: SessionParams, private val queuedTaskFactory: QueuedTaskFactory, private val taskExecutor: TaskExecutor, - private val memento: QueueMemento, + private val memento: QueueMemento ) : EventSenderProcessor { private val waitForNetworkSequencer = SemaphoreCoroutineSequencer() @@ -165,7 +164,7 @@ internal class EventSenderProcessorCoroutine @Inject constructor( markAsFinished(task) } - private suspend fun QueuedTask.markAsFailedOrRetry(failure: Throwable, retryDelay: Long){ + private suspend fun QueuedTask.markAsFailedOrRetry(failure: Throwable, retryDelay: Long) { if (retryCount.incrementAndGet() >= MAX_RETRY_COUNT) { onTaskFailed() } else { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt index 2e1acf710c..2d2111f838 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt @@ -33,9 +33,6 @@ import org.matrix.android.sdk.internal.session.SessionScope import org.matrix.android.sdk.internal.task.TaskExecutor import timber.log.Timber import java.io.IOException -import java.net.InetAddress -import java.net.InetSocketAddress -import java.net.Socket import java.util.Timer import java.util.TimerTask import java.util.concurrent.LinkedBlockingQueue @@ -74,7 +71,6 @@ internal class EventSenderProcessorThread @Inject constructor( interrupt() } - override fun start() { super.start() // We should check for sending events not handled because app was killed @@ -234,5 +230,4 @@ internal class EventSenderProcessorThread @Inject constructor( } synchronized(networkAvailableLock) { networkAvailableLock.wait() } } - } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/HomeServerAvailabilityChecker.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/HomeServerAvailabilityChecker.kt index c68be74a64..2d53699917 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/HomeServerAvailabilityChecker.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/HomeServerAvailabilityChecker.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 New Vector Ltd + * Copyright 2021 The Matrix.org Foundation C.I.C. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,5 +41,4 @@ internal class HomeServerAvailabilityChecker(val sessionParams: SessionParams) { return false } } - } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt index 472e4d440f..116c8d5c6b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt @@ -54,8 +54,7 @@ internal class QueueMemento @Inject constructor(context: Context, persist() } - fun trackedTasks() = synchronized(trackedTasks){ - + fun trackedTasks() = synchronized(trackedTasks) { } private fun persist() { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt index e5302c171c..948786677d 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt @@ -29,7 +29,7 @@ internal abstract class QueuedTask( val taskIdentifier: String ) : Cancelable { - override fun toString() = "${javaClass.simpleName} queueIdentifier: $queueIdentifier, taskIdentifier: ${taskIdentifier})" + override fun toString() = "${javaClass.simpleName} queueIdentifier: $queueIdentifier, taskIdentifier: $taskIdentifier)" var retryCount = AtomicInteger(0) From 7936c2c6f841c9456d3c3526999881c5ea19eb51 Mon Sep 17 00:00:00 2001 From: ganfra Date: Tue, 9 Mar 2021 20:23:06 +0100 Subject: [PATCH 4/4] Send: clean after Benoits review --- .../session/room/send/queue/EventSenderProcessorCoroutine.kt | 2 +- .../session/room/send/queue/EventSenderProcessorThread.kt | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt index e93ebe9048..2972332989 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt @@ -118,7 +118,7 @@ internal class EventSenderProcessorCoroutine @Inject constructor( executeTask(task) }.toCancelable() .also { - cancelableBag[task.taskIdentifier] + cancelableBag[task.taskIdentifier] = it } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt index 2d2111f838..b79a86dd7e 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt @@ -46,6 +46,7 @@ import kotlin.concurrent.schedule * * If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted */ +@Deprecated("You should know use EventSenderProcessorCoroutine instead") @SessionScope internal class EventSenderProcessorThread @Inject constructor( private val cryptoService: CryptoService,