You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

73 lines
2.7 KiB

2 years ago
2 years ago
  1. from typing import List
  2. import filetype
  3. from celery import shared_task
  4. from django.conf import settings
  5. from django.contrib.auth import get_user_model
  6. from django.shortcuts import get_object_or_404
  7. from django_drf_filepond.api import store_upload
  8. from django_drf_filepond.models import TemporaryUpload
  9. from .datasets import load_dataset
  10. from .pipeline.catalog import Format, create_file_format
  11. from .pipeline.exceptions import (
  12. FileImportException,
  13. FileTypeException,
  14. MaximumFileSizeException,
  15. )
  16. from .pipeline.readers import FileName
  17. from projects.models import Project
  18. def check_file_type(filename, file_format: Format, filepath: str):
  19. if not settings.ENABLE_FILE_TYPE_CHECK:
  20. return
  21. kind = filetype.guess(filepath)
  22. if not file_format.validate_mime(kind.mime):
  23. raise FileTypeException(filename, kind.mime, file_format.accept_types)
  24. def check_uploaded_files(upload_ids: List[str], file_format: Format):
  25. errors: List[FileImportException] = []
  26. cleaned_ids = []
  27. temporary_uploads = TemporaryUpload.objects.filter(upload_id__in=upload_ids)
  28. for tu in temporary_uploads:
  29. if tu.file.size > settings.MAX_UPLOAD_SIZE:
  30. errors.append(MaximumFileSizeException(tu.upload_name, settings.MAX_UPLOAD_SIZE))
  31. tu.delete()
  32. continue
  33. try:
  34. check_file_type(tu.upload_name, file_format, tu.get_file_path())
  35. except FileTypeException as e:
  36. errors.append(e)
  37. tu.delete()
  38. continue
  39. cleaned_ids.append(tu.upload_id)
  40. return cleaned_ids, errors
  41. @shared_task(autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True)
  42. def import_dataset(user_id, project_id, file_format: str, upload_ids: List[str], task: str, **kwargs):
  43. project = get_object_or_404(Project, pk=project_id)
  44. user = get_object_or_404(get_user_model(), pk=user_id)
  45. try:
  46. fmt = create_file_format(file_format)
  47. upload_ids, errors = check_uploaded_files(upload_ids, fmt)
  48. temporary_uploads = TemporaryUpload.objects.filter(upload_id__in=upload_ids)
  49. filenames = [
  50. FileName(full_path=tu.get_file_path(), generated_name=tu.file.name, upload_name=tu.upload_name)
  51. for tu in temporary_uploads
  52. ]
  53. dataset = load_dataset(task, fmt, filenames, project, **kwargs)
  54. dataset.save(user, batch_size=settings.IMPORT_BATCH_SIZE)
  55. upload_to_store(temporary_uploads)
  56. errors.extend(dataset.errors)
  57. return {"error": [e.dict() for e in errors]}
  58. except FileImportException as e:
  59. return {"error": [e.dict()]}
  60. def upload_to_store(temporary_uploads):
  61. for tu in temporary_uploads:
  62. store_upload(tu.upload_id, destination_file_path=tu.file.name)