mirror of https://github.com/doccano/doccano.git
pythondatasetsactive-learningtext-annotationdatasetnatural-language-processingdata-labelingmachine-learningannotation-tool
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
342 lines
11 KiB
342 lines
11 KiB
import csv
|
|
import io
|
|
import json
|
|
import os
|
|
from typing import Any, Dict, Iterator, List, Tuple
|
|
|
|
import chardet
|
|
import pyexcel
|
|
import pyexcel.exceptions
|
|
from chardet import UniversalDetector
|
|
from seqeval.scheme import BILOU, IOB2, IOBES, IOE2, Tokens
|
|
|
|
from .exceptions import FileParseException
|
|
from .readers import DEFAULT_LABEL_COLUMN, DEFAULT_TEXT_COLUMN, Parser
|
|
|
|
DEFAULT_ENCODING = "Auto"
|
|
|
|
|
|
def detect_encoding(filename: str, buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> str:
|
|
"""Detects character encoding automatically.
|
|
|
|
If you want to know the supported encodings, please see the following document:
|
|
https://chardet.readthedocs.io/en/latest/supported-encodings.html
|
|
|
|
Args:
|
|
filename: the filename for detecting the encoding.
|
|
buffer_size: the buffer size to read file contents incrementally.
|
|
|
|
Returns:
|
|
The character encoding.
|
|
"""
|
|
# For a small file.
|
|
if os.path.getsize(filename) < buffer_size:
|
|
detected = chardet.detect(open(filename, "rb").read())
|
|
return detected.get("encoding", "utf-8")
|
|
|
|
# For a large file, call the Universal Encoding Detector incrementally.
|
|
# It will stop as soon as it is confident enough to report its results.
|
|
# See: https://chardet.readthedocs.io/en/latest/usage.html
|
|
with open(filename, "rb") as f:
|
|
detector = UniversalDetector()
|
|
while True:
|
|
binary = f.read(buffer_size)
|
|
detector.feed(binary)
|
|
if binary == b"":
|
|
break
|
|
if detector.done:
|
|
break
|
|
if detector.done:
|
|
return detector.result["encoding"] or "utf-8"
|
|
else:
|
|
return "utf-8"
|
|
|
|
|
|
def decide_encoding(filename: str, encoding: str) -> str:
|
|
"""Decide character encoding automatically.
|
|
|
|
If the encoding is DEFAULT_ENCODING, detects it automatically.
|
|
Otherwise, return it as is.
|
|
|
|
Args:
|
|
filename: The filename for decide the encoding.
|
|
encoding: The specified encoding.
|
|
|
|
Returns:
|
|
The character encoding.
|
|
"""
|
|
if encoding == DEFAULT_ENCODING:
|
|
return detect_encoding(filename)
|
|
else:
|
|
return encoding
|
|
|
|
|
|
class LineReader:
|
|
"""LineReader is a helper class to read a file line by line.
|
|
|
|
Attributes:
|
|
filename: The filename to read.
|
|
encoding: The character encoding.
|
|
"""
|
|
|
|
def __init__(self, filename: str, encoding: str = DEFAULT_ENCODING):
|
|
self.filename = filename
|
|
self.encoding = encoding
|
|
|
|
def __iter__(self) -> Iterator[str]:
|
|
encoding = decide_encoding(self.filename, self.encoding)
|
|
with open(self.filename, encoding=encoding) as f:
|
|
for line in f:
|
|
yield line.rstrip()
|
|
|
|
|
|
class PlainParser(Parser):
|
|
"""PlainParser is a parser simply returns a dictionary.
|
|
|
|
This is for a task without any text.
|
|
"""
|
|
|
|
def __init__(self, **kwargs):
|
|
self.kwargs = kwargs
|
|
|
|
def parse(self, filename: str) -> Iterator[Dict[Any, Any]]:
|
|
yield {}
|
|
|
|
|
|
class LineParser(Parser):
|
|
"""LineParser is a parser to read a file line by line.
|
|
|
|
Attributes:
|
|
encoding: The character encoding.
|
|
"""
|
|
|
|
def __init__(self, encoding: str = DEFAULT_ENCODING, **kwargs):
|
|
self.encoding = encoding
|
|
|
|
def parse(self, filename: str) -> Iterator[Dict[Any, Any]]:
|
|
reader = LineReader(filename, self.encoding)
|
|
for line in reader:
|
|
yield {DEFAULT_TEXT_COLUMN: line}
|
|
|
|
|
|
class TextFileParser(Parser):
|
|
"""TextFileParser is a parser to read an entire file content.
|
|
|
|
Attributes:
|
|
encoding: The character encoding.
|
|
"""
|
|
|
|
def __init__(self, encoding: str = DEFAULT_ENCODING, **kwargs):
|
|
self.encoding = encoding
|
|
|
|
def parse(self, filename: str) -> Iterator[Dict[Any, Any]]:
|
|
encoding = decide_encoding(filename, self.encoding)
|
|
with open(filename, encoding=encoding) as f:
|
|
yield {DEFAULT_TEXT_COLUMN: f.read()}
|
|
|
|
|
|
class CSVParser(Parser):
|
|
"""CSVParser is a parser to read a csv file and return its rows.
|
|
|
|
Attributes:
|
|
encoding: The character encoding.
|
|
delimiter: A one-character string used to separate fields. It defaults to ','.
|
|
"""
|
|
|
|
def __init__(self, encoding: str = DEFAULT_ENCODING, delimiter: str = ",", **kwargs):
|
|
self.encoding = encoding
|
|
self.delimiter = delimiter
|
|
|
|
def parse(self, filename: str) -> Iterator[Dict[Any, Any]]:
|
|
encoding = decide_encoding(filename, self.encoding)
|
|
with open(filename, encoding=encoding) as f:
|
|
reader = csv.DictReader(f, delimiter=self.delimiter)
|
|
for row in reader:
|
|
yield row
|
|
|
|
|
|
class JSONParser(Parser):
|
|
"""JSONParser is a parser to read a json file and return its rows.
|
|
|
|
Attributes:
|
|
encoding: The character encoding.
|
|
"""
|
|
|
|
def __init__(self, encoding: str = DEFAULT_ENCODING, **kwargs):
|
|
self.encoding = encoding
|
|
self._errors: List[FileParseException] = []
|
|
|
|
def parse(self, filename: str) -> Iterator[Dict[Any, Any]]:
|
|
encoding = decide_encoding(filename, self.encoding)
|
|
with open(filename, encoding=encoding) as f:
|
|
try:
|
|
rows = json.load(f)
|
|
for row in rows:
|
|
yield row
|
|
except json.decoder.JSONDecodeError as e:
|
|
error = FileParseException(filename, line_num=1, message=str(e))
|
|
self._errors.append(error)
|
|
|
|
@property
|
|
def errors(self) -> List[FileParseException]:
|
|
return self._errors
|
|
|
|
|
|
class JSONLParser(Parser):
|
|
"""JSONLParser is a parser to read a JSONL file and return its rows.
|
|
|
|
Attributes:
|
|
encoding: The character encoding.
|
|
"""
|
|
|
|
def __init__(self, encoding: str = DEFAULT_ENCODING, **kwargs):
|
|
self.encoding = encoding
|
|
self._errors: List[FileParseException] = []
|
|
|
|
def parse(self, filename: str) -> Iterator[Dict[Any, Any]]:
|
|
reader = LineReader(filename, self.encoding)
|
|
for line_num, line in enumerate(reader, start=1):
|
|
try:
|
|
yield json.loads(line)
|
|
except json.decoder.JSONDecodeError as e:
|
|
error = FileParseException(filename, line_num, str(e))
|
|
self._errors.append(error)
|
|
|
|
@property
|
|
def errors(self) -> List[FileParseException]:
|
|
return self._errors
|
|
|
|
|
|
class ExcelParser(Parser):
|
|
"""ExcelParser is a parser to read a excel file."""
|
|
|
|
def __init__(self, **kwargs):
|
|
self._errors = []
|
|
|
|
def parse(self, filename: str) -> Iterator[Dict[Any, Any]]:
|
|
rows = pyexcel.iget_records(file_name=filename)
|
|
try:
|
|
for line_num, row in enumerate(rows, start=1):
|
|
yield row
|
|
except pyexcel.exceptions.FileTypeNotSupported as e:
|
|
error = FileParseException(filename, line_num=1, message=str(e))
|
|
self._errors.append(error)
|
|
|
|
@property
|
|
def errors(self) -> List[FileParseException]:
|
|
return self._errors
|
|
|
|
|
|
class FastTextParser(Parser):
|
|
"""FastTextParser is a parser to read a fastText format and returns a text and labels.
|
|
|
|
The example format is as follows:
|
|
__label__positive I really enjoyed this restaurant.
|
|
This format expects the category first, with the prefix ‘__label__’ before each category,
|
|
and then the input text, like so,
|
|
|
|
Attributes:
|
|
encoding: The character encoding.
|
|
label: The label prefix. It defaults to `__label__`.
|
|
"""
|
|
|
|
def __init__(self, encoding: str = DEFAULT_ENCODING, label: str = "__label__", **kwargs):
|
|
self.encoding = encoding
|
|
self.label = label
|
|
|
|
def parse(self, filename: str) -> Iterator[Dict[Any, Any]]:
|
|
reader = LineReader(filename, self.encoding)
|
|
for line in reader:
|
|
labels = []
|
|
tokens = []
|
|
for token in line.rstrip().split(" "):
|
|
if token.startswith(self.label):
|
|
label_name = token[len(self.label) :]
|
|
labels.append(label_name)
|
|
else:
|
|
tokens.append(token)
|
|
text = " ".join(tokens)
|
|
yield {DEFAULT_TEXT_COLUMN: text, DEFAULT_LABEL_COLUMN: labels}
|
|
|
|
|
|
class CoNLLParser(Parser):
|
|
"""CoNLLParser is a parser to read conll like format and returns a text and labels.
|
|
|
|
The example format is as follows:
|
|
EU B-ORG
|
|
rejects O
|
|
German B-MISC
|
|
call O
|
|
to O
|
|
boycott O
|
|
British B-MISC
|
|
lamb O
|
|
. O
|
|
|
|
Peter B-PER
|
|
Blackburn I-PER
|
|
This format expects a token in the first column, and a tag in the second column.
|
|
The each data is separated by a new line.
|
|
|
|
Attributes:
|
|
encoding: The character encoding.
|
|
delimiter: A one-character string used to separate fields. It defaults to ' '.
|
|
scheme: The tagging scheme. It supports `IOB2`, `IOE2`, `IOBES`, and `BILOU`.
|
|
"""
|
|
|
|
def __init__(self, encoding: str = DEFAULT_ENCODING, delimiter: str = " ", scheme: str = "IOB2", **kwargs):
|
|
self.encoding = encoding
|
|
self.delimiter = delimiter
|
|
mapping = {"IOB2": IOB2, "IOE2": IOE2, "IOBES": IOBES, "BILOU": BILOU}
|
|
self._errors: List[FileParseException] = []
|
|
if scheme in mapping:
|
|
self.scheme = mapping[scheme]
|
|
else:
|
|
self.scheme = None
|
|
|
|
@property
|
|
def errors(self) -> List[FileParseException]:
|
|
return self._errors
|
|
|
|
def parse(self, filename: str) -> Iterator[Dict[Any, Any]]:
|
|
if not self.scheme:
|
|
message = "The specified scheme is not supported."
|
|
error = FileParseException(filename, line_num=1, message=message)
|
|
self._errors.append(error)
|
|
return
|
|
|
|
reader = LineReader(filename, self.encoding)
|
|
words, tags = [], []
|
|
for line_num, line in enumerate(reader, start=1):
|
|
line = line.rstrip()
|
|
if line:
|
|
tokens = line.split("\t")
|
|
if len(tokens) != 2:
|
|
message = "A line must be separated by tab and has two columns."
|
|
self._errors.append(FileParseException(filename, line_num, message))
|
|
return
|
|
word, tag = tokens
|
|
words.append(word)
|
|
tags.append(tag)
|
|
else:
|
|
yield self.create_record(tags, words)
|
|
words, tags = [], []
|
|
if words:
|
|
yield self.create_record(tags, words)
|
|
|
|
def create_record(self, tags, words):
|
|
text = self.delimiter.join(words)
|
|
labels = self.align_span(words, tags)
|
|
return {DEFAULT_TEXT_COLUMN: text, DEFAULT_LABEL_COLUMN: labels}
|
|
|
|
def align_span(self, words: List[str], tags: List[str]) -> List[Tuple[int, int, str]]:
|
|
tokens = Tokens(tags, self.scheme)
|
|
labels = []
|
|
for entity in tokens.entities:
|
|
text = self.delimiter.join(words[: entity.start])
|
|
start = len(text) + len(self.delimiter) if text else len(text)
|
|
chunk = words[entity.start : entity.end]
|
|
text = self.delimiter.join(chunk)
|
|
end = start + len(text)
|
|
labels.append((start, end, entity.tag))
|
|
return labels
|