synapse/tests/rest/admin/test_media.py

1093 lines
38 KiB
Python

#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2021 The Matrix.org Foundation C.I.C.
# Copyright 2020 Dirk Klimpel
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import os
from typing import Dict
from parameterized import parameterized
from twisted.internet.testing import MemoryReactor
from twisted.web.resource import Resource
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.media._base import FileInfo
from synapse.media.filepath import MediaFilePaths
from synapse.rest.client import login, media, profile, room
from synapse.server import HomeServer
from synapse.util.clock import Clock
from tests import unittest
from tests.test_utils import SMALL_CMYK_JPEG, SMALL_PNG
from tests.unittest import override_config
VALID_TIMESTAMP = 1609459200000 # 2021-01-01 in milliseconds
INVALID_TIMESTAMP_IN_S = 1893456000 # 2030-01-01 in seconds
class _AdminMediaTests(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
synapse.rest.admin.register_servlets_for_media_repo,
login.register_servlets,
media.register_servlets,
]
def create_resource_dict(self) -> Dict[str, Resource]:
resources = super().create_resource_dict()
resources["/_matrix/media"] = self.hs.get_media_repository_resource()
return resources
class QueryMediaByIDTestCase(_AdminMediaTests):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.hs = hs
self.clock = clock
self.server_name = hs.hostname
self.store = hs.get_datastores().main
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
def _cache_remote_media(self, file_id: str) -> None:
file_info = FileInfo(server_name="remote.com", file_id=file_id)
media_storage = self.hs.get_media_repository().media_storage
ctx = media_storage.store_into_file(file_info)
(f, fname) = self.get_success(ctx.__aenter__())
f.write(SMALL_PNG)
self.get_success(ctx.__aexit__(None, None, None))
self.get_success(
self.store.store_cached_remote_media(
origin="remote.com",
media_id=file_id,
media_type="image/png",
media_length=len(SMALL_PNG),
time_now_ms=self.clock.time_msec(),
upload_name="test.png",
filesystem_id=file_id,
sha256=file_id,
)
)
channel = self.make_request(
"GET",
f"/_matrix/client/v1/media/download/remote.com/{file_id}",
shorthand=False,
access_token=self.admin_user_tok,
)
# Should be successful
self.assertEqual(
200,
channel.code,
msg=("Expected to receive a 200 on accessing media"),
)
def test_no_auth(self) -> None:
"""
Try to query media without authentication.
"""
url = f"/_synapse/admin/v1/media/{self.server_name}/12345"
channel = self.make_request("GET", url)
self.assertEqual(
401,
channel.code,
msg=channel.json_body,
)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
channel = self.make_request(
"GET",
f"/_synapse/admin/v1/media/{self.server_name}/12345",
access_token=self.other_user_token,
)
self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_local_media_does_not_exist(self) -> None:
"""
Tests that a lookup for local media that does not exist returns a 404
"""
channel = self.make_request(
"GET",
f"/_synapse/admin/v1/media/{self.server_name}/12345",
access_token=self.admin_user_tok,
)
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_remote_media_does_not_exist(self) -> None:
"""
Tests that a lookup for remote media that is not cached returns a 404
"""
channel = self.make_request(
"GET",
f"/_synapse/admin/v1/media/{self.server_name}/12345",
access_token=self.admin_user_tok,
)
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_query_local_media(self) -> None:
"""
Tests that querying an existing local media returns appropriate media info
"""
# Upload some media into the room
response = self.helper.upload_media(
SMALL_PNG,
tok=self.admin_user_tok,
expect_code=200,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
server_name, media_id = server_and_media_id.split("/")
self.assertEqual(server_name, self.server_name)
channel = self.make_request(
"GET",
f"/_synapse/admin/v1/media/{self.server_name}/{media_id}",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["media_info"]["authenticated"], True)
self.assertEqual(channel.json_body["media_info"]["media_id"], media_id)
self.assertEqual(
channel.json_body["media_info"]["media_length"], len(SMALL_PNG)
)
self.assertEqual(
channel.json_body["media_info"]["media_type"], "application/json"
)
self.assertEqual(channel.json_body["media_info"]["upload_name"], "test.png")
self.assertEqual(channel.json_body["media_info"]["user_id"], "@admin:test")
def test_query_remote_media(self) -> None:
file_id = "abcdefg12345"
self._cache_remote_media(file_id)
channel = self.make_request(
"GET",
f"/_synapse/admin/v1/media/remote.com/{file_id}",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["media_info"]["authenticated"], True)
self.assertEqual(channel.json_body["media_info"]["media_id"], file_id)
self.assertEqual(
channel.json_body["media_info"]["media_length"], len(SMALL_PNG)
)
self.assertEqual(channel.json_body["media_info"]["media_type"], "image/png")
self.assertEqual(channel.json_body["media_info"]["upload_name"], "test.png")
self.assertEqual(channel.json_body["media_info"]["media_origin"], "remote.com")
class DeleteMediaByIDTestCase(_AdminMediaTests):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.server_name = hs.hostname
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.filepaths = MediaFilePaths(hs.config.media.media_store_path)
def test_no_auth(self) -> None:
"""
Try to delete media without authentication.
"""
url = "/_synapse/admin/v1/media/%s/%s" % (self.server_name, "12345")
channel = self.make_request("DELETE", url, b"{}")
self.assertEqual(
401,
channel.code,
msg=channel.json_body,
)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
url = "/_synapse/admin/v1/media/%s/%s" % (self.server_name, "12345")
channel = self.make_request(
"DELETE",
url,
access_token=self.other_user_token,
)
self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_media_does_not_exist(self) -> None:
"""
Tests that a lookup for a media that does not exist returns a 404
"""
url = "/_synapse/admin/v1/media/%s/%s" % (self.server_name, "12345")
channel = self.make_request(
"DELETE",
url,
access_token=self.admin_user_tok,
)
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_media_is_not_local(self) -> None:
"""
Tests that a lookup for a media that is not a local returns a 400
"""
url = "/_synapse/admin/v1/media/%s/%s" % ("unknown_domain", "12345")
channel = self.make_request(
"DELETE",
url,
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Can only delete local media", channel.json_body["error"])
@override_config({"enable_authenticated_media": False})
def test_delete_media(self) -> None:
"""
Tests that delete a media is successfully
"""
# Upload some media into the room
response = self.helper.upload_media(
SMALL_PNG,
tok=self.admin_user_tok,
expect_code=200,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
server_name, media_id = server_and_media_id.split("/")
self.assertEqual(server_name, self.server_name)
# Attempt to access media
channel = self.make_request(
"GET",
f"/_matrix/media/v3/download/{server_and_media_id}",
shorthand=False,
access_token=self.admin_user_tok,
)
# Should be successful
self.assertEqual(
200,
channel.code,
msg=(
"Expected to receive a 200 on accessing media: %s" % server_and_media_id
),
)
# Test if the file exists
local_path = self.filepaths.local_media_filepath(media_id)
self.assertTrue(os.path.exists(local_path))
url = "/_synapse/admin/v1/media/%s/%s" % (self.server_name, media_id)
# Delete media
channel = self.make_request(
"DELETE",
url,
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
media_id,
channel.json_body["deleted_media"][0],
)
# Attempt to access media
channel = self.make_request(
"GET",
f"/_matrix/media/v3/download/{server_and_media_id}",
shorthand=False,
access_token=self.admin_user_tok,
)
self.assertEqual(
404,
channel.code,
msg=(
"Expected to receive a 404 on accessing deleted media: %s"
% server_and_media_id
),
)
# Test if the file is deleted
self.assertFalse(os.path.exists(local_path))
class DeleteMediaByDateSizeTestCase(_AdminMediaTests):
servlets = [
synapse.rest.admin.register_servlets,
synapse.rest.admin.register_servlets_for_media_repo,
login.register_servlets,
profile.register_servlets,
room.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.media_repo = hs.get_media_repository_resource()
self.server_name = hs.hostname
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.filepaths = MediaFilePaths(hs.config.media.media_store_path)
self.url = "/_synapse/admin/v1/media/delete"
self.legacy_url = "/_synapse/admin/v1/media/%s/delete" % self.server_name
# Move clock up to somewhat realistic time
self.reactor.advance(1000000000)
def test_no_auth(self) -> None:
"""
Try to delete media without authentication.
"""
channel = self.make_request("POST", self.url, b"{}")
self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
channel = self.make_request(
"POST",
self.url,
access_token=self.other_user_token,
)
self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_media_is_not_local(self) -> None:
"""
Tests that a lookup for media that is not local returns a 400
"""
url = "/_synapse/admin/v1/media/%s/delete" % "unknown_domain"
channel = self.make_request(
"POST",
url + f"?before_ts={VALID_TIMESTAMP}",
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Can only delete local media", channel.json_body["error"])
def test_missing_parameter(self) -> None:
"""
If the parameter `before_ts` is missing, an error is returned.
"""
channel = self.make_request(
"POST",
self.url,
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Missing required integer query parameter before_ts",
channel.json_body["error"],
)
def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
channel = self.make_request(
"POST",
self.url + "?before_ts=-1234",
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Query parameter before_ts must be a positive integer.",
channel.json_body["error"],
)
channel = self.make_request(
"POST",
self.url + f"?before_ts={INVALID_TIMESTAMP_IN_S}",
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Query parameter before_ts you provided is from the year 1970. "
+ "Double check that you are providing a timestamp in milliseconds.",
channel.json_body["error"],
)
channel = self.make_request(
"POST",
self.url + f"?before_ts={VALID_TIMESTAMP}&size_gt=-1234",
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Query parameter size_gt must be a positive integer.",
channel.json_body["error"],
)
channel = self.make_request(
"POST",
self.url + f"?before_ts={VALID_TIMESTAMP}&keep_profiles=not_bool",
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Boolean query parameter 'keep_profiles' must be one of ['true', 'false']",
channel.json_body["error"],
)
@parameterized.expand([(True,), (False,)])
def test_delete_media_never_accessed(self, use_legacy_url: bool) -> None:
"""
Tests that media deleted if it is older than `before_ts` and never accessed
`last_access_ts` is `NULL` and `created_ts` < `before_ts`
"""
url = self.legacy_url if use_legacy_url else self.url
# upload and do not access
server_and_media_id = self._create_media()
self.pump(1.0)
# test that the file exists
media_id = server_and_media_id.split("/")[1]
local_path = self.filepaths.local_media_filepath(media_id)
self.assertTrue(os.path.exists(local_path))
# timestamp after upload/create
now_ms = self.clock.time_msec()
channel = self.make_request(
"POST",
url + "?before_ts=" + str(now_ms),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
media_id,
channel.json_body["deleted_media"][0],
)
self._access_media(server_and_media_id, False)
@override_config({"enable_authenticated_media": False})
def test_keep_media_by_date(self) -> None:
"""
Tests that media is not deleted if it is newer than `before_ts`
"""
# timestamp before upload
now_ms = self.clock.time_msec()
server_and_media_id = self._create_media()
self._access_media(server_and_media_id)
channel = self.make_request(
"POST",
self.url + "?before_ts=" + str(now_ms),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self._access_media(server_and_media_id)
# timestamp after upload
now_ms = self.clock.time_msec()
channel = self.make_request(
"POST",
self.url + "?before_ts=" + str(now_ms),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
server_and_media_id.split("/")[1],
channel.json_body["deleted_media"][0],
)
self._access_media(server_and_media_id, False)
@override_config({"enable_authenticated_media": False})
def test_keep_media_by_size(self) -> None:
"""
Tests that media is not deleted if its size is smaller than or equal
to `size_gt`
"""
server_and_media_id = self._create_media()
self._access_media(server_and_media_id)
now_ms = self.clock.time_msec()
channel = self.make_request(
"POST",
self.url + "?before_ts=" + str(now_ms) + "&size_gt=67",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self._access_media(server_and_media_id)
now_ms = self.clock.time_msec()
channel = self.make_request(
"POST",
self.url + "?before_ts=" + str(now_ms) + "&size_gt=66",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
server_and_media_id.split("/")[1],
channel.json_body["deleted_media"][0],
)
self._access_media(server_and_media_id, False)
@override_config({"enable_authenticated_media": False})
def test_keep_media_by_user_avatar(self) -> None:
"""
Tests that we do not delete media if is used as a user avatar
Tests parameter `keep_profiles`
"""
server_and_media_id = self._create_media()
self._access_media(server_and_media_id)
# set media as avatar
channel = self.make_request(
"PUT",
"/profile/%s/avatar_url" % (self.admin_user,),
content={"avatar_url": "mxc://%s" % (server_and_media_id,)},
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
now_ms = self.clock.time_msec()
channel = self.make_request(
"POST",
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=true",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self._access_media(server_and_media_id)
now_ms = self.clock.time_msec()
channel = self.make_request(
"POST",
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=false",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
server_and_media_id.split("/")[1],
channel.json_body["deleted_media"][0],
)
self._access_media(server_and_media_id, False)
@override_config({"enable_authenticated_media": False})
def test_keep_media_by_room_avatar(self) -> None:
"""
Tests that we do not delete media if it is used as a room avatar
Tests parameter `keep_profiles`
"""
server_and_media_id = self._create_media()
self._access_media(server_and_media_id)
# set media as room avatar
room_id = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
channel = self.make_request(
"PUT",
"/rooms/%s/state/m.room.avatar" % (room_id,),
content={"url": "mxc://%s" % (server_and_media_id,)},
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
now_ms = self.clock.time_msec()
channel = self.make_request(
"POST",
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=true",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self._access_media(server_and_media_id)
now_ms = self.clock.time_msec()
channel = self.make_request(
"POST",
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=false",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
server_and_media_id.split("/")[1],
channel.json_body["deleted_media"][0],
)
self._access_media(server_and_media_id, False)
def _create_media(self) -> str:
"""
Create a media and return media_id and server_and_media_id
"""
# Upload some media into the room
response = self.helper.upload_media(
SMALL_PNG,
tok=self.admin_user_tok,
expect_code=200,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
server_name = server_and_media_id.split("/")[0]
# Check that new media is a local and not remote
self.assertEqual(server_name, self.server_name)
return server_and_media_id
def _access_media(
self, server_and_media_id: str, expect_success: bool = True
) -> None:
"""
Try to access a media and check the result
"""
media_id = server_and_media_id.split("/")[1]
local_path = self.filepaths.local_media_filepath(media_id)
channel = self.make_request(
"GET",
f"/_matrix/media/v3/download/{server_and_media_id}",
shorthand=False,
access_token=self.admin_user_tok,
)
if expect_success:
self.assertEqual(
200,
channel.code,
msg=(
"Expected to receive a 200 on accessing media: %s"
% server_and_media_id
),
)
# Test that the file exists
self.assertTrue(os.path.exists(local_path))
else:
self.assertEqual(
404,
channel.code,
msg=(
"Expected to receive a 404 on accessing deleted media: %s"
% (server_and_media_id)
),
)
# Test that the file is deleted
self.assertFalse(os.path.exists(local_path))
class QuarantineMediaByIDTestCase(_AdminMediaTests):
def upload_media_and_return_media_id(self, data: bytes) -> str:
# Upload some media into the room
response = self.helper.upload_media(
data,
tok=self.admin_user_tok,
expect_code=200,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
return server_and_media_id.split("/")[1]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.server_name = hs.hostname
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.media_id = self.upload_media_and_return_media_id(SMALL_PNG)
self.media_id_2 = self.upload_media_and_return_media_id(SMALL_PNG)
self.media_id_3 = self.upload_media_and_return_media_id(SMALL_PNG)
self.media_id_other = self.upload_media_and_return_media_id(SMALL_CMYK_JPEG)
self.url = "/_synapse/admin/v1/media/%s/%s/%s"
@parameterized.expand(["quarantine", "unquarantine"])
def test_no_auth(self, action: str) -> None:
"""
Try to protect media without authentication.
"""
channel = self.make_request(
"POST",
self.url % (action, self.server_name, self.media_id),
b"{}",
)
self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["quarantine", "unquarantine"])
def test_requester_is_no_admin(self, action: str) -> None:
"""
If the user is not a server admin, an error is returned.
"""
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
channel = self.make_request(
"POST",
self.url % (action, self.server_name, self.media_id),
access_token=self.other_user_token,
)
self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_quarantine_media(self) -> None:
"""
Tests that quarantining and remove from quarantine a media is successfully
"""
media_info = self.get_success(self.store.get_local_media(self.media_id))
assert media_info is not None
self.assertFalse(media_info.quarantined_by)
# quarantining
channel = self.make_request(
"POST",
self.url % ("quarantine", self.server_name, self.media_id),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
media_info = self.get_success(self.store.get_local_media(self.media_id))
assert media_info is not None
self.assertTrue(media_info.quarantined_by)
# remove from quarantine
channel = self.make_request(
"POST",
self.url % ("unquarantine", self.server_name, self.media_id),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
media_info = self.get_success(self.store.get_local_media(self.media_id))
assert media_info is not None
self.assertFalse(media_info.quarantined_by)
def test_quarantine_media_match_hash(self) -> None:
"""
Tests that quarantining removes all media with the same hash
"""
media_info = self.get_success(self.store.get_local_media(self.media_id))
assert media_info is not None
self.assertFalse(media_info.quarantined_by)
# quarantining
channel = self.make_request(
"POST",
self.url % ("quarantine", self.server_name, self.media_id),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
# Test that ALL similar media was quarantined.
for media_item in [self.media_id, self.media_id_2, self.media_id_3]:
media_info = self.get_success(self.store.get_local_media(media_item))
assert media_info is not None
self.assertTrue(media_info.quarantined_by)
# Test that other media was not.
media_info = self.get_success(self.store.get_local_media(self.media_id_other))
assert media_info is not None
self.assertFalse(media_info.quarantined_by)
# remove from quarantine
channel = self.make_request(
"POST",
self.url % ("unquarantine", self.server_name, self.media_id),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
# Test that ALL similar media is now reset.
for media_item in [self.media_id, self.media_id_2, self.media_id_3]:
media_info = self.get_success(self.store.get_local_media(media_item))
assert media_info is not None
self.assertFalse(media_info.quarantined_by)
def test_quarantine_protected_media(self) -> None:
"""
Tests that quarantining from protected media fails
"""
# protect
self.get_success(self.store.mark_local_media_as_safe(self.media_id, safe=True))
# verify protection
media_info = self.get_success(self.store.get_local_media(self.media_id))
assert media_info is not None
self.assertTrue(media_info.safe_from_quarantine)
# quarantining
channel = self.make_request(
"POST",
self.url % ("quarantine", self.server_name, self.media_id),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
# verify that is not in quarantine
media_info = self.get_success(self.store.get_local_media(self.media_id))
assert media_info is not None
self.assertFalse(media_info.quarantined_by)
class ProtectMediaByIDTestCase(_AdminMediaTests):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
hs.get_media_repository_resource()
self.store = hs.get_datastores().main
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
# Upload some media into the room
response = self.helper.upload_media(
SMALL_PNG,
tok=self.admin_user_tok,
expect_code=200,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
self.media_id = server_and_media_id.split("/")[1]
self.url = "/_synapse/admin/v1/media/%s/%s"
@parameterized.expand(["protect", "unprotect"])
def test_no_auth(self, action: str) -> None:
"""
Try to protect media without authentication.
"""
channel = self.make_request("POST", self.url % (action, self.media_id), b"{}")
self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["protect", "unprotect"])
def test_requester_is_no_admin(self, action: str) -> None:
"""
If the user is not a server admin, an error is returned.
"""
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
channel = self.make_request(
"POST",
self.url % (action, self.media_id),
access_token=self.other_user_token,
)
self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_protect_media(self) -> None:
"""
Tests that protect and unprotect a media is successfully
"""
media_info = self.get_success(self.store.get_local_media(self.media_id))
assert media_info is not None
self.assertFalse(media_info.safe_from_quarantine)
# protect
channel = self.make_request(
"POST",
self.url % ("protect", self.media_id),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
media_info = self.get_success(self.store.get_local_media(self.media_id))
assert media_info is not None
self.assertTrue(media_info.safe_from_quarantine)
# unprotect
channel = self.make_request(
"POST",
self.url % ("unprotect", self.media_id),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
media_info = self.get_success(self.store.get_local_media(self.media_id))
assert media_info is not None
self.assertFalse(media_info.safe_from_quarantine)
class PurgeMediaCacheTestCase(_AdminMediaTests):
servlets = [
synapse.rest.admin.register_servlets,
synapse.rest.admin.register_servlets_for_media_repo,
login.register_servlets,
profile.register_servlets,
room.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.media_repo = hs.get_media_repository_resource()
self.server_name = hs.hostname
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.filepaths = MediaFilePaths(hs.config.media.media_store_path)
self.url = "/_synapse/admin/v1/purge_media_cache"
def test_no_auth(self) -> None:
"""
Try to delete media without authentication.
"""
channel = self.make_request("POST", self.url, b"{}")
self.assertEqual(
401,
channel.code,
msg=channel.json_body,
)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_not_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
channel = self.make_request(
"POST",
self.url,
access_token=self.other_user_token,
)
self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
channel = self.make_request(
"POST",
self.url + "?before_ts=-1234",
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Query parameter before_ts must be a positive integer.",
channel.json_body["error"],
)
channel = self.make_request(
"POST",
self.url + f"?before_ts={INVALID_TIMESTAMP_IN_S}",
access_token=self.admin_user_tok,
)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Query parameter before_ts you provided is from the year 1970. "
+ "Double check that you are providing a timestamp in milliseconds.",
channel.json_body["error"],
)