Source code for bella.parsers

'''
Functions that parse the annotated data that is being used in this project. The
annotated dataset are the following:

1. `Li Dong <http://goo.gl/5Enpu7>`_ which links to :py:func:`bella.parsers.dong`
2. Semeval parser
'''
import json
import os
import re
import xml.etree.ElementTree as ET
from typing import Tuple
from pathlib import Path

import ftfy

from bella.data_types import Target, TargetCollection

[docs]def dong(file_path: Path, **target_collection_kwargs) -> TargetCollection: ''' Given file path to the `Li Dong <https://github.com/bluemonk482/tdparse/tree/master/data/lidong>`_ sentiment data it will parse the data and return it as a list of dictionaries. :param file_path: File Path to the annotated data :param target_collection_kwargs: Keywords to parse to the TargetCollection constructor that is returned. :returns: A TargetCollection containing Target instances. ''' file_path = os.path.abspath(file_path) if not os.path.isfile(file_path): raise FileNotFoundError('This file does not exist {}'.format(file_path)) file_name, _ = os.path.splitext(os.path.basename(file_path)) sentiment_range = [-1, 0, 1] sentiment_data = TargetCollection(**target_collection_kwargs) with open(file_path, 'r') as dong_file: sent_dict = {} for index, line in enumerate(dong_file): divisible = index + 1 line = line.strip() if divisible % 3 == 1: sent_dict['text'] = line elif divisible % 3 == 2: sent_dict['target'] = line elif divisible % 3 == 0: sentiment = int(line) if sentiment not in sentiment_range: raise ValueError('The sentiment has to be one of the '\ 'following values {} not {}'\ .format(sentiment_range, sentiment)) sent_dict['sentiment'] = int(line) text = sent_dict['text'].lower() target = sent_dict['target'].lower() offsets = [match.span() for match in re.finditer(target, text)] if len(target.split()) > 1: joined_target = ''.join(target.split()) offsets.extend([match.span() for match in re.finditer(joined_target, text)]) sent_dict['spans'] = offsets sent_id = file_name + str(len(sentiment_data)) # Sentence ID is the same as the target as there is only one # target per sentence sent_dict['sentence_id'] = sent_id sent_dict['target_id'] = sent_id sent_target = Target(**sent_dict) sentiment_data.add(sent_target) sent_dict = {} else: raise Exception('Problem') return sentiment_data
def _semeval_extract_data(sentences, file_name, conflict=False, sentence_ids_skip=None, **target_collection_kwargs ) -> TargetCollection: ''' :param sentences: A `sentences` named element :param file_name: Name of the file being parsed :param conflict: Determine if to keep the target data that has a conflict \ sentiment label. :param sentence_ids_skip: IDs of sentences that should be skipped :type sentences: xml.etree.ElementTree.Element :type file_name: String :type conflict: bool. Defailt False :type sentence_ids_skip: list. Default None :returns: A TargetCollection containing Target instances. :rtype: TargetCollection ''' # Converts the sentiment tags from Strings to ints sentiment_mapper = {'conflict' : -2, 'negative' : -1, 'neutral' : 0, 'positive' : 1} def extract_aspect_terms(aspect_terms, sentence_id, raise_error_no_category=False): ''' :param aspect_terms: An aspectTerms element within the xml tree :param sentence_id: Id of the sentence that the aspects came from. :type aspect_terms: xml.etree.ElementTree.Element :type sentence_id: String :returns: A list of dictioanries containg id, span, sentiment and \ target :rtype: list ''' aspect_terms_data = [] for index, aspect_term in enumerate(aspect_terms): aspect_term = aspect_term.attrib aspect_term_data = {} sentiment = sentiment_mapper[aspect_term['polarity']] if sentiment == -2 and not conflict: continue aspect_id = '{}{}'.format(sentence_id, index) aspect_term_data['target_id'] = aspect_id if 'term' in aspect_term: aspect_term_data['target'] = aspect_term['term'] elif 'target' in aspect_term: aspect_term_data['target'] = aspect_term['target'] else: raise KeyError('There is no `target` attribute in the opinions '\ 'element {}'.format(aspect_term)) # Extract the category if it exists if 'category' in aspect_term: aspect_term_data['category'] = aspect_term['category'] elif raise_error_no_category: raise KeyError('These is no `category` attribute in the ' f'opinions element {aspect_term}') aspect_term_data['sentiment'] = sentiment aspect_term_data['spans'] = [(int(aspect_term['from']), int(aspect_term['to']))] aspect_term_data['sentence_id'] = sentence_id # If the target is NULL then there is no target if aspect_term_data['target'] == 'NULL': continue aspect_terms_data.append(aspect_term_data) return aspect_terms_data def add_text(aspect_data, text): ''' :param aspect_data: A list of dicts containing `span`, `target` and \ `sentiment` keys. :param text: The text of the sentence that is associated to all of the \ aspects in the aspect_data list :type aspect_data: list :type text: String :returns: The list of dicts in the aspect_data parameter but with a \ `text` key with the value that the text parameter contains :rtype: list ''' for data in aspect_data: data['text'] = text return aspect_data all_aspect_term_data = TargetCollection(**target_collection_kwargs) for sentence in sentences: aspect_term_data = None text_index = None sentence_id = file_name + sentence.attrib['id'] # Allow the parser to skip certain sentences if sentence_ids_skip is not None: if sentence.attrib['id'] in sentence_ids_skip: continue for index, data in enumerate(sentence): if data.tag == 'sentence': raise Exception(sentence.attrib['id']) if data.tag == 'text': text_index = index elif data.tag == 'aspectTerms' or data.tag == 'Opinions': aspect_term_data = extract_aspect_terms(data, sentence_id) if aspect_term_data is None: continue if text_index is None: raise ValueError('A semeval sentence should always have text '\ 'semeval file {} sentence id {}'\ .format(file_name, sentence.attrib['id'])) sentence_text = sentence[text_index].text aspect_term_data = add_text(aspect_term_data, sentence_text) for aspect in aspect_term_data: sent_target = Target(**aspect) all_aspect_term_data.add(sent_target) return all_aspect_term_data
[docs]def semeval_15_16(file_path: Path, sep_16_from_15: bool = False, sep_15_from_14: bool = False, raise_error_no_category: bool = False, **target_collection_kwargs) -> TargetCollection: ''' Parser for the SemEval 2015 and 2016 datasets. :param file_path: File path to the semeval 2015/16 data :param sep_16_from_15: Ensure that the test sets of semeval 2016 is complete seperate from the semeval test set of 2015 :param target_collection_kwargs: Keywords to parse to the TargetCollection constructor that is returned. :returns: A TargetCollection containing Target instances. ''' file_path = os.path.abspath(file_path) file_name, _ = os.path.splitext(os.path.basename(file_path)) tree = ET.parse(file_path) reviews = tree.getroot() all_aspect_term_data = [] ids_to_skip = [] if sep_16_from_15: ids_to_skip = ["en_SnoozeanAMEatery_480032670:4"] elif sep_15_from_14: ids_to_skip = ['1253117:2', '397331:1'] if reviews.tag != 'Reviews': raise ValueError('The root of all semeval 15/16 xml files should ' f'be reviews and not {reviews.tag}') for review in reviews: review_id = review.attrib['rid'] for sentences in review: review_targets = _semeval_extract_data(sentences, file_name, sentence_ids_skip=ids_to_skip) all_aspect_term_data.extend(review_targets.data()) return TargetCollection(all_aspect_term_data, **target_collection_kwargs)
[docs]def semeval_14(file_path: Path, conflict: bool = False, **target_collection_kwargs) -> TargetCollection: ''' Parser for the SemEval 2014 datasets. :param file_path: File path to the semeval 2014 data :param conflict: determine if to include the conflict sentiment value :param target_collection_kwargs: Keywords to parse to the TargetCollection constructor that is returned. :returns: A TargetCollection containing Target instances. ''' file_path = os.path.abspath(file_path) file_name, _ = os.path.splitext(os.path.basename(file_path)) tree = ET.parse(file_path) sentences = tree.getroot() if sentences.tag != 'sentences': raise ValueError('The root of all semeval xml files should '\ 'be sentences and not {}'\ .format(sentences.tag)) return _semeval_extract_data(sentences, file_name, conflict=conflict, **target_collection_kwargs)
[docs]def election(folder_path: Path, include_dnr: bool = False, include_additional: bool = False ) -> Tuple[TargetCollection, TargetCollection]: ''' Data can be downloaded from `FigShare <https://figshare.com/articles/EACL_2017_-_Multi-target_\ UK_election_Twitter_sentiment_corpus/4479563/1>`_ :param folder_path: Path to the folder containing the data after it has been unziped and all folders within it have been unziped. :param include_dnr: determine if to include the `doesnotapply` label :param include_additional: NOTE: This does not work at the moment. Determine if to parse the additional data. :returns: A TargetCollection containing Target instances. ''' sentiment_mapper = {'negative' : -1, 'neutral' : 0, 'positive' : 1} folder_path = os.path.abspath(folder_path) folder_name, _ = os.path.splitext(os.path.basename(folder_path)) def get_file_data(folder_dir): ''' :param folder_dir: File path to a folder containing JSON data files \ where the file names is the datas ID :type folder_dir: String :returns: A dictionary of IDs as keys and JSON data as values :rtype: dict ''' data = {} for file_name in os.listdir(folder_dir): file_path = os.path.join(folder_dir, file_name) tweet_id = file_name.rstrip('.json').lstrip('5') with open(file_path, 'r') as file_data: data[tweet_id] = json.load(file_data) return data def parse_tweet(tweet_data, anno_data, tweet_id): def get_offsets(entity, tweet_text, target): offset_shifts = [0, -1, 1] from_offset = entity['offset'] for offset_shift in offset_shifts: from_offset_shift = from_offset + offset_shift to_offset = from_offset_shift + len(target) offsets = [(from_offset_shift, to_offset)] offset_text = tweet_text[from_offset_shift : to_offset].lower() if offset_text == target.lower(): return offsets raise ValueError('Offset {} does not match target text {}. Full '\ 'text {}\nid {}'\ .format(from_offset, target, tweet_text, tweet_id)) def fuzzy_target_match(tweet_text, target): low_target = target.lower() target_searches = [low_target, r'[^\w]' + low_target, r'[^\w]' + low_target + r'[^\w]', low_target + r'[^\w]', low_target.replace(' ', ''), low_target.replace(" '", '')] for target_search in target_searches: target_matches = list(re.finditer(target_search, tweet_text.lower())) if len(target_matches) == 1: return target_matches if tweet_id in set(['81211671026352128', '78689580104290305', '81209490499960832']): return None if tweet_id == '75270720671973376' and target == 'kippers': return None if tweet_id == '65855178264686592' and target == 'tax': return None print(tweet_data) print(anno_data) raise ValueError('Cannot find the exact additional '\ 'entity {} within the tweet {}'\ .format(target, tweet_text)) target_instances = [] tweet_id = str(tweet_id) tweet_text = tweet_data['content'] target_ids = [] # Parse all of the entities that have been detected automatically for entity in tweet_data['entities']: data_dict = {} target = entity['entity'] target_ids.append(entity['id']) entity_id = str(entity['id']) data_dict['spans'] = get_offsets(entity, tweet_text, target) data_dict['target'] = entity['entity'] data_dict['target_id'] = folder_name + tweet_id + '#' + entity_id data_dict['sentence_id'] = folder_name + tweet_id data_dict['sentiment'] = anno_data['items'][entity_id] if data_dict['sentiment'] == 'doesnotapply' and not include_dnr: continue # Convert from Strings to Integer data_dict['sentiment'] = sentiment_mapper[data_dict['sentiment']] data_dict['text'] = tweet_text target_instances.append(Target(**data_dict)) # Parse all of the entities that have been selected by the user if include_additional: additional_data = anno_data['additional_items'] if isinstance(additional_data, dict): for target, sentiment in additional_data.items(): target_matches = fuzzy_target_match(tweet_text, target) if target_matches is None: continue target_id = max(target_ids) + 1 target_ids.append(target_id) data_dict['spans'] = [target_matches[0].span()] data_dict['target'] = target data_dict['sentiment'] = sentiment data_dict['text'] = tweet_text data_dict['sentence_id'] = tweet_id data_dict['target_id'] = tweet_id + '#' + str(target_id) target_instances.append(Target(**data_dict)) return target_instances def get_data(id_file, tweets_data, annos_data): targets = [] with open(id_file, 'r') as id_data: for tweet_id in id_data: tweet_id = tweet_id.strip() tweet_data = tweets_data[tweet_id] anno_data = annos_data[tweet_id] targets.extend(parse_tweet(tweet_data, anno_data, tweet_id)) return TargetCollection(targets) tweets_data = get_file_data(os.path.join(folder_path, 'tweets')) annotations_data = get_file_data(os.path.join(folder_path, 'annotations')) train_ids_file = os.path.join(folder_path, 'train_id.txt') train_data = get_data(train_ids_file, tweets_data, annotations_data) test_ids_file = os.path.join(folder_path, 'test_id.txt') test_data = get_data(test_ids_file, tweets_data, annotations_data) return train_data, test_data
[docs]def election_train(folder_path: Path, include_dnr: bool = False, include_additional: bool = False, **target_collection_kwargs ) -> TargetCollection: ''' Data can be downloaded from `FigShare <https://figshare.com/articles/EACL_2017_-_Multi-target_\ UK_election_Twitter_sentiment_corpus/4479563/1>`_. This function parses and returns only the training data. :param folder_path: Path to the folder containing the data after it has been unziped and all folders within it have been unziped. :param include_dnr: determine if to include the `doesnotapply` label :param include_additional: NOTE: This does not work at the moment. Determine if to parse the additional data. :param target_collection_kwargs: Keywords to parse to the TargetCollection constructor that is returned. :returns: A TargetCollection containing Target instances. ''' train_data, _ = election(folder_path, include_dnr=include_dnr, include_additional=include_additional) return TargetCollection(train_data.data(), **target_collection_kwargs)
[docs]def election_test(folder_path: Path, include_dnr: bool = False, include_additional: bool = False, **target_collection_kwargs ) -> TargetCollection: ''' Data can be downloaded from `FigShare <https://figshare.com/articles/EACL_2017_-_Multi-target_\ UK_election_Twitter_sentiment_corpus/4479563/1>`_. This function parses and returns only the test data. :param folder_path: Path to the folder containing the data after it has been unziped and all folders within it have been unziped. :param include_dnr: determine if to include the `doesnotapply` label :param include_additional: NOTE: This does not work at the moment. Determine if to parse the additional data. :param target_collection_kwargs: Keywords to parse to the TargetCollection constructor that is returned. :returns: A TargetCollection containing Target instances. ''' _, test_data = election(folder_path, include_dnr=include_dnr, include_additional=include_additional) return TargetCollection(test_data.data(), **target_collection_kwargs)
[docs]def hu_liu(file_path): ''' Parser for the datasets from the following two papers (DOES NOT WORK): 1. `A Holistic Lexicon-Based Approach to Opinion Mining \ <https://www.cs.uic.edu/~liub/FBS/opinion-mining-final-WSDM.pdf>`_ 2. `Mining and Summarizing Customer Reviews \ <https://www.cs.uic.edu/~liub/publications/kdd04-revSummary.pdf>`_ Currently this does not work. This is due to the dataset not containing enough data to determine where the targets are in the text. :param file_path: The path to a file containing annotations in the format \ of hu and liu sentiment datasets. :type file_path: String :returns: A TargetCollection containing Target instances. :rtype: TargetCollection ''' file_path = os.path.abspath(file_path) file_name = os.path.basename(file_path) sentiment_data = TargetCollection() with open(file_path, 'r', encoding='cp1252') as annotations: for sentence_index, annotation in enumerate(annotations): # If it does not contain ## then not a sentence if '##' not in annotation: continue targets_text = annotation.split('##') if len(targets_text) > 2 or len(targets_text) < 1: raise ValueError('The annotation {} when split on `##` should '\ 'contain at least the sentence text and at'\ ' most the text and the targets and not {}'\ .format(annotation, targets_text)) # If it just contains the sentence text then go to next elif len(targets_text) == 1: continue elif targets_text[0].strip() == '': continue targets, text = targets_text targets = targets.strip() text = text.strip() sentence_id = file_name + '#{}'.format(sentence_index) targets = targets.split(',') for target_index, target in enumerate(targets): target = target.strip() sentiment_match = re.search(r'\[[+-]\d\]$', target) is_implicit = re.search(r'\[[up]\]', target) if is_implicit: print('Target {} is implicit {}'.format(target, text)) continue if not sentiment_match: raise ValueError('Target {} does not have a corresponding'\ ' sentiment value. annotation {}'\ .format(target, annotation)) target_text = target[:sentiment_match.start()].strip() sentiment_text = sentiment_match.group().strip().strip('[]') sentiment_value = int(sentiment_text) target_matches = list(re.finditer(target_text, text)) if len(target_matches) != 1: print('The Target {} can only occur once in the '\ 'text {}'.format(target_text, text)) continue raise ValueError('The Target {} can only occur once in the '\ 'text {}'.format(target_text, text)) target_span = target_matches[0].span() target_id = sentence_id + '#{}'.format(target_index) data_dict = {} data_dict['spans'] = [target_span] data_dict['target'] = target_text data_dict['sentiment'] = sentiment_value data_dict['text'] = text data_dict['sentence_id'] = sentence_id data_dict['target_id'] = target_id sentiment_data.add(Target(**data_dict)) return sentiment_data
[docs]def mitchel(file_name: Path, **target_collection_kwargs) -> TargetCollection: ''' Parser for the dataset introduced by `Mitchel et al. \ <https://www.aclweb.org/anthology/D13-1171>`_. The dataset can be downloaded from `<here http://www.m-mitchell.com/code/MitchellEtAl-13-OpenSentiment.tgz>`_ the dataset can be found within the tarball under /en/10-fold and then choose one of the folds e.g. train_1 and test_1 to get the full dataset. :param file_path: path to either the train or test data. :param target_collection_kwargs: Keywords to parse to the TargetCollection constructor that is returned. :returns: A TargetCollection containing Target instances. ''' def extract_targets(current_target, end_span, start_span, targets, target_spans, target_index, tweet_text, sentiment_data, tweet_id, target_sentiments): if current_target != []: target_word = ' '.join(current_target) end_span = start_span + len(target_word) targets.append(target_word) target_spans.append((start_span, end_span)) start_span, end_span = None, None current_target = [] target_index += 1 tweet_text = ' '.join(tweet_text) for index, target in enumerate(targets): target_id = '{}#{}'.format(tweet_id, index) target_sentiment = target_sentiments[index] target_span = target_spans[index] if tweet_text[target_span[0] : target_span[1]] != target: raise Exception('The target span {} does not match the '\ 'target word {} in {}'\ .format(target_span, target, tweet_text)) target_data = {'spans' : [target_span], 'target_id' : target_id, 'target' : target, 'text' : tweet_text, 'sentiment' : target_sentiment, 'sentence_id' : tweet_id} target_data = Target(**target_data) sentiment_data.add(target_data) return sentiment_data sentiment_mapper = {'negative' : -1, 'neutral' : 0, 'positive' : 1} sentiment_data = TargetCollection(**target_collection_kwargs) with open(file_name, 'r') as fp: tweet_id = None tweet_text = [] targets = [] current_target = [] target_sentiments = [] target_spans = [] start_span = None end_span = None target_index = 0 for line in fp: line = line.strip() tweet_id_line = re.match(r'## Tweet (\d+)', line) if tweet_id_line is not None: if tweet_text != []: sentiment_data = extract_targets(current_target, end_span, start_span, targets, target_spans, target_index, tweet_text, sentiment_data, tweet_id, target_sentiments) tweet_text = [] targets = [] current_target = [] target_sentiments = [] target_spans = [] target_index = 0 start_span = None end_span = None target_index = 0 tweet_id = tweet_id_line.group(1) continue if line == '': continue line_data = line.split('\t') if len(line_data) != 3: if len(line_data) == 4: if line_data[2] == 'NUMBER': line_data = line_data[0], line_data[1], line_data[3] else: raise Exception('Cannot parse line {} in Tweet ID {}'\ .format(line, tweet_id)) else: raise Exception('Cannot parse line {} in Tweet ID {}'.format(line, tweet_id)) word, ner_data, sentiment = line_data if len(word.split()) != 1: raise Exception('Why is the word got a space in it: {}'.format(word)) word = ftfy.fix_encoding(word) tweet_text.append(word) # Contains sentiment if sentiment != '_': if len(current_target) != 0: if ner_data[0] == 'B': target_word = ' '.join(current_target) end_span = start_span + len(target_word) targets.append(target_word) target_spans.append((start_span, end_span)) start_span, end_span = None, None current_target = [] target_index += 1 else: raise Exception('Contains the following target {} id {}'\ .format(current_target, tweet_id)) current_target.append(word) sentiment = sentiment_mapper[sentiment] target_sentiments.append(sentiment) start_span = len(' '.join(tweet_text)) - len(word) elif len(current_target) != 0: if ner_data[0] == 'I': current_target.append(word) else: target_word = ' '.join(current_target) end_span = start_span + len(target_word) targets.append(target_word) target_spans.append((start_span, end_span)) start_span, end_span = None, None current_target = [] target_index += 1 sentiment_data = extract_targets(current_target, end_span, start_span, targets, target_spans, target_index, tweet_text, sentiment_data, tweet_id, target_sentiments) return sentiment_data