import os
from datetime import datetime
import asyncio
from blake3 import blake3
from pathlib import Path
from pymongo import MongoClient
from evalquiz_proto.shared.exceptions import (
FirstDataChunkNotMetadataException,
NoMimetypeMappingException,
)
from evalquiz_proto.shared.generated import (
MaterialServerBase,
Empty,
ListOfStrings,
MaterialUploadData,
Metadata,
String,
)
from grpclib.server import Server
from typing import AsyncIterator
from evalquiz_proto.shared.mimetype_resolver import MimetypeResolver
from evalquiz_proto.shared.path_dictionary_controller import (
PathDictionaryController,
)
import betterproto
[docs]class MaterialServerService(MaterialServerBase):
"""Serves endpoints for material manipulation."""
def __init__(
self,
material_storage_path: Path,
path_dictionary_controller: PathDictionaryController = PathDictionaryController(
MongoClient("material-server-db", 27017)
),
) -> None:
"""Constructor of MaterialServerService.
Creates `material_storage_path` folder, if not existent.
Args:
material_storage_path (Path): Specifies the path where lecture materials are stored.
path_dictionary_controller: (PathDictionaryController): A custom material controller can be passed as an argument, otherwise a PathDictionaryController with default arguments is initialized.
"""
self.material_storage_path = material_storage_path
self.path_dictionary_controller = path_dictionary_controller
if not os.path.exists(material_storage_path):
os.mkdir(material_storage_path)
[docs] async def upload_material(
self, material_upload_data_iterator: AsyncIterator["MaterialUploadData"]
) -> "Empty":
"""Asynchronous method that is used by gRPC as an endpoint.
Manages a lecture material upload.
Note on how local_path is built: The file extension is added to the hash to enable mimetype recognition when loading the lecture material:
When PathDictionaryController.load_file is invoked.
Args:
material_upload_data_iterator (AsyncIterator[MaterialUploadData]): An Iterator which elements represent packages of the stream. Includes a Metadata instance as the first element and data in bytes as the following elements.
Raises:
FirstDataChunkNotMetadataException: Raised, if the first element is not a Metadata instance.
NoMimetypeMappingException: The mimetype in lecture_material.file_type could not be mapped to a file extension.
Returns:
Empty: Empty gRPC compatible return format. Equivalent to "None".
"""
material_upload_data = await material_upload_data_iterator.__anext__()
(type, metadata) = betterproto.which_one_of(
material_upload_data, "material_upload_data"
)
if metadata is not None and type == "metadata":
extension = MimetypeResolver.fixed_guess_extension(metadata.mimetype)
if extension is None:
raise NoMimetypeMappingException()
async_iterator_bytes = self._to_async_iterator_bytes(
material_upload_data_iterator
)
load_local_path = await self._load_from_binary_iterator(
async_iterator_bytes
)
hash = self._calculate_hash(load_local_path)
local_path = self.material_storage_path / hash
local_path = local_path.parent / (local_path.name + extension)
self.path_dictionary_controller.copy_and_load_file(
load_local_path, local_path, hash, metadata.name
)
return Empty()
raise FirstDataChunkNotMetadataException()
async def _load_from_binary_iterator(
self, binary_iterator: AsyncIterator[bytes]
) -> Path:
"""Loads file from binary into into `/tmp` folder.
Args:
binary_iterator (AsyncIterator[bytes]): Binary iterator to work with.
Returns:
Path: Path to the file in `/tmp`.
"""
timestamp = datetime.utcnow().isoformat()
local_path = Path("/tmp/current_evalquiz_upload_" + timestamp)
with open(local_path, "ab") as local_file:
local_file.truncate(0)
while True:
try:
data = await binary_iterator.__anext__()
local_file.write(data)
except StopAsyncIteration:
break
return local_path
def _calculate_hash(self, local_path: Path) -> str:
"""Calculates blake3 hash of local file.
Args:
local_path (Path): Path to local file.
Returns:
str: Resulting hash.
"""
with open(local_path, "rb") as local_file:
file_content = local_file.read()
return blake3(file_content).hexdigest()
async def _to_async_iterator_bytes(
self, material_upload_data_iterator: AsyncIterator[MaterialUploadData]
) -> AsyncIterator[bytes]:
"""Converts AsyncIterator[MaterialUploadData] to AsyncIterator[bytes] by runtime type checking/assertions.
Args:
material_upload_data_iterator (AsyncIterator[MaterialUploadData]): Iterator of MaterialUploadData.
Returns:
AsyncIterator[bytes]: Iterator of bytes.
"""
material_upload_data = await material_upload_data_iterator.__anext__()
(type, data) = betterproto.which_one_of(
material_upload_data, "material_upload_data"
)
if data is not None and type == "data":
yield data
else:
TypeError(
"AsyncIterator[MaterialUploadData] cannot be converted into AsyncIterator[bytes]."
)
[docs] async def delete_material(self, string: "String") -> "Empty":
"""Asynchronous method that is used by gRPC as an endpoint.
Manages deletion of lecture materials
Args:
string (String): The hash of the lecture material.
Returns:
Empty: Empty gRPC compatible return format. Equivalent to "None".
"""
self.path_dictionary_controller.delete_file(string.value)
return Empty()
[docs] async def get_material_hashes(self, empty: "Empty") -> "ListOfStrings":
"""Asynchronous method that is used by gRPC as an endpoint.
Returns hashes of all registered lecture materials.
Args:
empty (Empty): Empty gRPC compatible return format. Equivalent to "None". Required as parameter.
Returns:
ListOfStrings: Hashes of all registered lecture materials.
"""
material_hashes = self.path_dictionary_controller.get_material_hashes()
return ListOfStrings(material_hashes)
[docs] async def get_material_name(self, string: "String") -> "String":
"""Asynchronous method that is used by gRPC as an endpoint.
Returns material name of a lecture material hash.
Args:
string (String): The hash of the lecture material.
Returns:
String: Material name, as specified in PathDictionaryController.
"""
hash = string.value
name = self.path_dictionary_controller.get_material_name(hash)
return String(name)
[docs] async def get_material(
self, string: "String"
) -> "AsyncIterator[MaterialUploadData]":
"""Asynchronous method that is used by gRPC as an endpoint.
Returns a specific lecture material for a hash.
Args:
string (String): Hash of the material to query.
Returns:
AsyncIterator[MaterialUploadData]: An Iterator which elements represent packages of the stream. Includes LectureMaterial as the first element and data in bytes as the following elements.
"""
(
mimetype,
material_upload_iterator,
) = await self.path_dictionary_controller.get_file_from_hash_async(string.value)
metadata = Metadata(mimetype, string.value)
yield MaterialUploadData(metadata=metadata)
while True:
try:
data = await material_upload_iterator.__anext__()
material_upload_data = MaterialUploadData(data=data)
yield material_upload_data
except StopAsyncIteration:
break
[docs]async def main() -> None:
"""Sets up and starts MaterialServerService."""
material_storage_path = Path(__file__).parent / "lecture_materials"
if not os.path.exists(material_storage_path):
os.makedirs(material_storage_path)
server = Server([MaterialServerService(material_storage_path)])
await server.start("0.0.0.0", 50051)
print("Server started at port 50051.", flush=True)
await server.wait_closed()
if __name__ == "__main__":
"""Prevents accidental execution of the server via import."""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())