diff --git a/backend/data_import/celery_tasks.py b/backend/data_import/celery_tasks.py index e7f8fb25..2ae90b6c 100644 --- a/backend/data_import/celery_tasks.py +++ b/backend/data_import/celery_tasks.py @@ -8,16 +8,10 @@ from django.shortcuts import get_object_or_404 from django_drf_filepond.api import store_upload from django_drf_filepond.models import TemporaryUpload +from .datasets import load_dataset from .pipeline.catalog import AudioFile, ImageFile from .pipeline.exceptions import FileTypeException, MaximumFileSizeException -from .pipeline.factories import ( - create_builder, - create_cleaner, - create_parser, - select_examples, -) -from .pipeline.readers import FileName, Reader -from .pipeline.writers import Writer +from .pipeline.readers import FileName from projects.models import Project @@ -41,13 +35,15 @@ def check_uploaded_files(upload_ids: List[str], file_format: str): temporary_uploads = TemporaryUpload.objects.filter(upload_id__in=upload_ids) for tu in temporary_uploads: if tu.file.size > settings.MAX_UPLOAD_SIZE: - errors.append(MaximumFileSizeException(tu.upload_name, settings.MAX_UPLOAD_SIZE).dict()) + errors.append(MaximumFileSizeException(tu.upload_name, settings.MAX_UPLOAD_SIZE)) tu.delete() continue try: check_file_type(tu.upload_name, file_format, tu.get_file_path()) except FileTypeException as e: - errors.append(e.dict()) + errors.append(e) + tu.delete() + continue cleaned_ids.append(tu.upload_id) return cleaned_ids, errors @@ -64,15 +60,11 @@ def import_dataset(user_id, project_id, file_format: str, upload_ids: List[str], for tu in temporary_uploads ] - parser = create_parser(file_format, **kwargs) - builder = create_builder(project, **kwargs) - cleaner = create_cleaner(project) - reader = Reader(filenames=filenames, parser=parser, builder=builder, cleaner=cleaner) - writer = Writer(batch_size=settings.IMPORT_BATCH_SIZE) - examples = select_examples(project) - writer.save(reader, project, user, examples) + dataset = load_dataset(file_format, filenames, project, **kwargs) + dataset.save(user, batch_size=settings.IMPORT_BATCH_SIZE) upload_to_store(temporary_uploads) - return {"error": reader.errors + errors} + errors.extend(dataset.errors) + return {"error": [e.dict() for e in errors]} def upload_to_store(temporary_uploads):