Browse Source

Replace Writer with BulkWriter

pull/1823/head
Hironsan 3 years ago
parent
commit
97353da68f
2 changed files with 3 additions and 15 deletions
  1. 4
      backend/data_import/celery_tasks.py
  2. 14
      backend/data_import/pipeline/writers.py

4
backend/data_import/celery_tasks.py

@ -12,7 +12,7 @@ from .pipeline.catalog import AudioFile, ImageFile
from .pipeline.exceptions import FileTypeException, MaximumFileSizeException
from .pipeline.factories import create_builder, create_cleaner, create_parser
from .pipeline.readers import FileName, Reader
from .pipeline.writers import BulkWriter
from .pipeline.writers import Writer
from projects.models import Project
@ -63,7 +63,7 @@ def import_dataset(user_id, project_id, file_format: str, upload_ids: List[str],
builder = create_builder(project, **kwargs)
reader = Reader(filenames=filenames, parser=parser, builder=builder)
cleaner = create_cleaner(project)
writer = BulkWriter(batch_size=settings.IMPORT_BATCH_SIZE)
writer = Writer(batch_size=settings.IMPORT_BATCH_SIZE)
writer.save(reader, project, user, cleaner)
upload_to_store(temporary_uploads)
return {"error": writer.errors + errors}

14
backend/data_import/pipeline/writers.py

@ -1,4 +1,3 @@
import abc
import itertools
from collections import defaultdict
from typing import Any, Dict, List, Type
@ -12,17 +11,6 @@ from label_types.models import CategoryType, LabelType, SpanType
from projects.models import Project
class Writer(abc.ABC):
@abc.abstractmethod
def save(self, reader: BaseReader, project: Project, user, cleaner):
"""Save the read contents to DB."""
raise NotImplementedError("Please implement this method in the subclass.")
def errors(self) -> List[Dict[Any, Any]]:
"""Return errors."""
raise NotImplementedError("Please implement this method in the subclass.")
def group_by_class(instances):
groups = defaultdict(list)
for instance in instances:
@ -54,7 +42,7 @@ class Examples:
return len(self) == 0
class BulkWriter(Writer):
class Writer:
def __init__(self, batch_size: int):
self.examples = Examples(batch_size)
self._errors: List[FileParseException] = []

Loading…
Cancel
Save