You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
4.3 KiB

3 years ago
3 years ago
  1. import itertools
  2. from celery import shared_task
  3. from celery.utils.log import get_task_logger
  4. from django.conf import settings
  5. from django.contrib.auth import get_user_model
  6. from django.shortcuts import get_object_or_404
  7. from .models import Example, Label, Project
  8. from .views.download.factory import create_repository, create_writer
  9. from .views.download.service import ExportApplicationService
  10. from .views.upload.exception import FileParseException
  11. from .views.upload.factory import (get_data_class, get_dataset_class,
  12. get_label_class)
  13. from .views.upload.utils import append_field
  14. logger = get_task_logger(__name__)
  15. class Buffer:
  16. def __init__(self, buffer_size=settings.IMPORT_BATCH_SIZE):
  17. self.buffer_size = buffer_size
  18. self.buffer = []
  19. def __len__(self):
  20. return len(self.buffer)
  21. @property
  22. def data(self):
  23. return self.buffer
  24. def add(self, data):
  25. self.buffer.append(data)
  26. def clear(self):
  27. self.buffer = []
  28. def is_full(self):
  29. return len(self) >= self.buffer_size
  30. def is_empty(self):
  31. return len(self) == 0
  32. class DataFactory:
  33. def __init__(self, data_class, label_class, annotation_class):
  34. self.data_class = data_class
  35. self.label_class = label_class
  36. self.annotation_class = annotation_class
  37. def create_label(self, examples, project):
  38. flatten = itertools.chain(*[example.label for example in examples])
  39. labels = {
  40. label['text'] for label in flatten
  41. if not project.labels.filter(text=label['text']).exists()
  42. }
  43. labels = [self.label_class(text=text, project=project) for text in labels]
  44. self.label_class.objects.bulk_create(labels)
  45. def create_data(self, examples, project):
  46. dataset = [
  47. self.data_class(project=project, **example.data)
  48. for example in examples
  49. ]
  50. results = self.data_class.objects.bulk_create(dataset)
  51. return results
  52. def create_annotation(self, examples, ids, user, project):
  53. mapping = {label.text: label.id for label in project.labels.all()}
  54. annotation = [example.annotation(mapping) for example in examples]
  55. for a, id in zip(annotation, ids):
  56. append_field(a, example=id)
  57. annotation = list(itertools.chain(*annotation))
  58. for a in annotation:
  59. if 'label' in a:
  60. a['label_id'] = a.pop('label')
  61. annotation = [self.annotation_class(**a, user=user) for a in annotation]
  62. self.annotation_class.objects.bulk_create(annotation)
  63. def create(self, examples, user, project):
  64. self.create_label(examples, project)
  65. ids = self.create_data(examples, project)
  66. self.create_annotation(examples, ids, user, project)
  67. @shared_task
  68. def injest_data(user_id, project_id, filenames, format: str, **kwargs):
  69. project = get_object_or_404(Project, pk=project_id)
  70. user = get_object_or_404(get_user_model(), pk=user_id)
  71. response = {'error': []}
  72. # Prepare dataset.
  73. dataset_class = get_dataset_class(format)
  74. dataset = dataset_class(
  75. filenames=filenames,
  76. label_class=get_label_class(project.project_type),
  77. data_class=get_data_class(project.project_type),
  78. **kwargs
  79. )
  80. it = iter(dataset)
  81. buffer = Buffer()
  82. factory = DataFactory(
  83. data_class=Example,
  84. label_class=Label,
  85. annotation_class=project.get_annotation_class()
  86. )
  87. while True:
  88. try:
  89. example = next(it)
  90. except StopIteration:
  91. break
  92. except FileParseException as err:
  93. response['error'].append(err.dict())
  94. continue
  95. buffer.add(example)
  96. if buffer.is_full():
  97. factory.create(buffer.data, user, project)
  98. buffer.clear()
  99. if not buffer.is_empty():
  100. logger.debug(f'BUFFER LEN {len(buffer)}')
  101. factory.create(buffer.data, user, project)
  102. buffer.clear()
  103. return response
  104. @shared_task
  105. def export_dataset(project_id, format: str, export_approved=False):
  106. project = get_object_or_404(Project, pk=project_id)
  107. repository = create_repository(project)
  108. writer = create_writer(format)(settings.MEDIA_ROOT)
  109. service = ExportApplicationService(repository, writer)
  110. filepath = service.export(export_approved)
  111. return filepath