You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
4.3 KiB

3 years ago
  1. import datetime
  2. import itertools
  3. from celery import shared_task
  4. from django.conf import settings
  5. from django.contrib.auth import get_user_model
  6. from django.shortcuts import get_object_or_404
  7. from .models import Document, Label, Project
  8. from .views.download.factory import create_repository, create_writer
  9. from .views.download.service import ExportApplicationService
  10. from .views.upload.exception import FileParseException
  11. from .views.upload.factory import (get_data_class, get_dataset_class,
  12. get_label_class)
  13. from .views.upload.utils import append_field
  14. class Buffer:
  15. def __init__(self, buffer_size=settings.IMPORT_BATCH_SIZE):
  16. self.buffer_size = buffer_size
  17. self.buffer = []
  18. def __len__(self):
  19. return len(self.buffer)
  20. @property
  21. def data(self):
  22. return self.buffer
  23. def add(self, data):
  24. self.buffer.append(data)
  25. def clear(self):
  26. self.buffer = []
  27. def is_full(self):
  28. return len(self) >= self.buffer_size
  29. def is_empty(self):
  30. return len(self) == 0
  31. class DataFactory:
  32. def __init__(self, data_class, label_class, annotation_class):
  33. self.data_class = data_class
  34. self.label_class = label_class
  35. self.annotation_class = annotation_class
  36. def create_label(self, examples, project):
  37. flatten = itertools.chain(*[example.label for example in examples])
  38. labels = {
  39. label['text'] for label in flatten
  40. if not project.labels.filter(text=label['text']).exists()
  41. }
  42. labels = [self.label_class(text=text, project=project) for text in labels]
  43. self.label_class.objects.bulk_create(labels)
  44. def create_data(self, examples, project):
  45. dataset = [
  46. self.data_class(project=project, **example.data)
  47. for example in examples
  48. ]
  49. now = datetime.datetime.now()
  50. self.data_class.objects.bulk_create(dataset)
  51. ids = self.data_class.objects.filter(created_at__gte=now)
  52. return list(ids)
  53. def create_annotation(self, examples, ids, user, project):
  54. mapping = {label.text: label.id for label in project.labels.all()}
  55. annotation = [example.annotation(mapping) for example in examples]
  56. for a, id in zip(annotation, ids):
  57. append_field(a, document=id)
  58. annotation = list(itertools.chain(*annotation))
  59. for a in annotation:
  60. if 'label' in a:
  61. a['label_id'] = a.pop('label')
  62. annotation = [self.annotation_class(**a, user=user) for a in annotation]
  63. self.annotation_class.objects.bulk_create(annotation)
  64. def create(self, examples, user, project):
  65. self.create_label(examples, project)
  66. ids = self.create_data(examples, project)
  67. self.create_annotation(examples, ids, user, project)
  68. @shared_task
  69. def injest_data(user_id, project_id, filenames, format: str, **kwargs):
  70. project = get_object_or_404(Project, pk=project_id)
  71. user = get_object_or_404(get_user_model(), pk=user_id)
  72. response = {'error': []}
  73. # Prepare dataset.
  74. dataset_class = get_dataset_class(format)
  75. dataset = dataset_class(
  76. filenames=filenames,
  77. label_class=get_label_class(project.project_type),
  78. data_class=get_data_class(project.project_type),
  79. **kwargs
  80. )
  81. it = iter(dataset)
  82. buffer = Buffer()
  83. factory = DataFactory(
  84. data_class=Document,
  85. label_class=Label,
  86. annotation_class=project.get_annotation_class()
  87. )
  88. while True:
  89. try:
  90. example = next(it)
  91. except StopIteration:
  92. break
  93. except FileParseException as err:
  94. response['error'].append(err.dict())
  95. continue
  96. buffer.add(example)
  97. if buffer.is_full():
  98. factory.create(buffer.data, user, project)
  99. buffer.clear()
  100. if not buffer.is_empty():
  101. factory.create(buffer.data, user, project)
  102. buffer.clear()
  103. return response
  104. @shared_task
  105. def export_dataset(project_id, format: str, export_approved=False):
  106. project = get_object_or_404(Project, pk=project_id)
  107. repository = create_repository(project)
  108. writer = create_writer(format)(settings.MEDIA_ROOT)
  109. service = ExportApplicationService(repository, writer)
  110. filepath = service.export(export_approved)
  111. return filepath