Source code for target_extraction.allen.models.target_sentiment.interactive_attention_network

from typing import Optional, List, Dict

from allennlp.common.checks import ConfigurationError, check_dimensions_match
from allennlp.data import Vocabulary, TextFieldTensors
from allennlp.modules.attention import BilinearAttention
from allennlp.modules.seq2vec_encoders import BagOfEmbeddingsEncoder
from allennlp.modules import FeedForward, Seq2SeqEncoder, TextFieldEmbedder
from allennlp.modules import InputVariationalDropout, TimeDistributed
from allennlp.models.model import Model
from allennlp.nn import InitializerApplicator, RegularizerApplicator
from allennlp.nn import Activation, util
from allennlp.training.metrics import CategoricalAccuracy, F1Measure
import torch
from torch.nn.modules import Dropout, Linear
import numpy
from overrides import overrides

from target_extraction.allen.models import target_sentiment
from target_extraction.allen.models.target_sentiment.util import elmo_input_reverse, elmo_input_reshape
from target_extraction.allen.models.target_sentiment.util import concat_position_embeddings
from target_extraction.allen.modules.inter_target import InterTarget
from target_extraction.allen.modules.target_position_weight import TargetPositionWeight

[docs]@Model.register("interactive_attention_network_classifier") class InteractivateAttentionNetworkClassifier(Model): def __init__(self, vocab: Vocabulary, context_field_embedder: TextFieldEmbedder, context_encoder: Seq2SeqEncoder, target_encoder: Seq2SeqEncoder, feedforward: Optional[FeedForward] = None, context_attention_activation_function: str = 'tanh', target_attention_activation_function: str = 'tanh', target_field_embedder: Optional[TextFieldEmbedder] = None, inter_target_encoding: Optional[InterTarget] = None, target_position_weight: Optional[TargetPositionWeight] = None, target_position_embedding: Optional[TextFieldEmbedder] = None, initializer: InitializerApplicator = InitializerApplicator(), regularizer: Optional[RegularizerApplicator] = None, dropout: float = 0.0, label_name: str = 'target-sentiment-labels', loss_weights: Optional[List[float]] = None, use_target_sequences: bool = False) -> None: super().__init__(vocab, regularizer) ''' :param vocab: A Vocabulary, required in order to compute sizes for input/output projections. :param context_field_embedder: Used to embed the context/sentence and target text if target_field_embedder is None but the target_encoder is NOT None. :param context_encoder: Encoder that will create the representation for the sentence/context that the target appears in. :param target_encoder: Encoder that will create the representation of target text tokens. :param feedforward: An optional feed forward layer to apply after the encoder. :param context_attention_activation_function: The attention method to be used on the context. :param target_attention_activation_function: The attention method to be used on the target text. :param target_field_embedder: Used to embed the target text to give as input to the target_encoder. Thus this allows a separate embedding for context and target text. :param inter_target_encoding: Whether to model the relationship between targets/aspect. :param target_position_weight: Whether to weight the output of the context encoding based on the position of the tokens to the target tokens. This weighting is applied before any attention is applied. :param target_position_embedding: Whether or not to concatenate a position embedding on to the input embeddings before being an input to the `context_encoder`. :param initializer: Used to initialize the model parameters. :param regularizer: If provided, will be used to calculate the regularization penalty during training. :param dropout: To apply dropout after each layer apart from the last layer. All dropout that is applied to timebased data will be `variational dropout <https://arxiv.org/abs/1512.05287>`_ all else will be standard dropout. Variation dropout is applied to the target vectors after they have been processed by the `inter_target_encoding` if this is set. :param label_name: Name of the label name space. :param loss_weights: The amount of weight to give the negative, neutral, positive classes respectively. e.g. [0.2, 0.5, 0.3] would weight the negative class by a factor of 0.2, neutral by 0.5 and positive by 0.3. NOTE It assumes the sentiment labels are the following: [negative, neutral, positive]. :param use_target_sequences: Whether or not to use target tokens within the context as the targets contextualized word representation (CWR). This would only make sense to use if the word representation i.e. field embedder is a contextualized embedder e.g. ELMO etc. This also requires that the dataset reader has the following argument set to True `target_sequences`. ANOTHER reason why you would want to use this even when not using CWR is that you want to get contextualised POS/Dep tags etc. This is based on the `Interactive Attention Networks for Aspect-Level Sentiment Classification <https://www.ijcai.org/proceedings/2017/0568.pdf>`_. The model is also known as `IAN`. .. _variational dropout: https://papers.nips.cc/paper/6241-a-theoretically-grounded-application-of-dropout-in-recurrent-neural-networks.pdf ''' self.label_name = label_name self.context_field_embedder = context_field_embedder self.target_field_embedder = target_field_embedder self.num_classes = self.vocab.get_vocab_size(self.label_name) self.target_encoder = target_encoder self.context_encoder = context_encoder self.feedforward = feedforward self._use_target_sequences = use_target_sequences if self._use_target_sequences and self.target_field_embedder: raise ConfigurationError('`use_target_sequences` cannot be True at' ' the same time as a value for ' '`target_field_embedder` as the embeddings' ' come from the context and not a separate embedder') context_attention_activation_function = Activation.by_name(f'{context_attention_activation_function}')() target_attention_activation_function = Activation.by_name(f'{target_attention_activation_function}')() target_encoder_out = self.target_encoder.get_output_dim() context_encoder_out = self.context_encoder.get_output_dim() self.context_attention_layer = BilinearAttention(target_encoder_out, context_encoder_out, context_attention_activation_function, normalize=True) self.target_attention_layer = BilinearAttention(context_encoder_out, target_encoder_out, target_attention_activation_function, normalize=True) # To be used as the pooled input into the target attention layer as # the query vector. self._context_averager = BagOfEmbeddingsEncoder(context_encoder_out, averaged=True) # To be used as the pooled input into the context attention layer as # the query vector. self._target_averager = BagOfEmbeddingsEncoder(target_encoder_out, averaged=True) # Set the loss weights (have to sort them by order of label index in # the vocab) self.loss_weights = target_sentiment.util.loss_weight_order(self, loss_weights, self.label_name) # Inter target modelling self.inter_target_encoding = inter_target_encoding if feedforward is not None: output_dim = self.feedforward.get_output_dim() elif self.inter_target_encoding is not None: output_dim = self.inter_target_encoding.get_output_dim() else: output_dim = target_encoder_out + context_encoder_out self.label_projection = Linear(output_dim, self.num_classes) self.metrics = { "accuracy": CategoricalAccuracy() } self.f1_metrics = {} # F1 Scores label_index_name = self.vocab.get_index_to_token_vocabulary(self.label_name) for label_index, _label_name in label_index_name.items(): _label_name = f'F1_{_label_name.capitalize()}' self.f1_metrics[_label_name] = F1Measure(label_index) # Dropout self._variational_dropout = InputVariationalDropout(dropout) self._naive_dropout = Dropout(dropout) # position embeddings self.target_position_embedding = target_position_embedding # Ensure that the dimensions of the text field embedder and text encoder # match if self.target_position_embedding: context_and_position_dim = (context_field_embedder.get_output_dim() + self.target_position_embedding.get_output_dim()) check_dimensions_match(context_and_position_dim, context_encoder.get_input_dim(), "context field embedding dim and the position embeddings", "text encoder input dim") else: check_dimensions_match(context_field_embedder.get_output_dim(), context_encoder.get_input_dim(), "context field embedding dim", "text encoder input dim") # Ensure that the dimensions of the target or text field embedder and # the target encoder match target_field_embedder_dim = context_field_embedder.get_output_dim() target_field_error = "context field embedding dim" if self.target_field_embedder: target_field_embedder_dim = target_field_embedder.get_output_dim() target_field_error = "target field embedding dim" check_dimensions_match(target_field_embedder_dim, target_encoder.get_input_dim(), target_field_error, "target encoder input dim") if self.inter_target_encoding: check_dimensions_match(target_encoder_out + context_encoder_out, self.inter_target_encoding.get_input_dim(), 'Output from target and context encdoers', 'Inter Target encoder input dim') self.target_position_weight = target_position_weight # TimeDistributed anything that is related to the targets. if self.feedforward is not None: self.feedforward = TimeDistributed(self.feedforward) self.label_projection = TimeDistributed(self.label_projection) self._time_naive_dropout = TimeDistributed(self._naive_dropout) initializer(self)
[docs] def forward(self, tokens: TextFieldTensors, targets: TextFieldTensors, target_sentiments: torch.LongTensor = None, target_sequences: Optional[torch.LongTensor] = None, metadata: Optional[torch.LongTensor] = None, position_weights: Optional[torch.LongTensor] = None, position_embeddings: Optional[Dict[str, torch.LongTensor]] = None, **kwargs ) -> Dict[str, torch.Tensor]: ''' The text and targets are Dictionaries as they are text fields they can be represented many different ways e.g. just words or words and chars etc therefore the dictionary represents these different ways e.g. {'words': words_tensor_ids, 'chars': char_tensor_ids} ''' # Embed and encode text as a sequence embedded_context = self.context_field_embedder(tokens) embedded_context = self._variational_dropout(embedded_context) context_mask = util.get_text_field_mask(tokens) targets_mask = util.get_text_field_mask(targets, num_wrapping_dims=1) label_mask = (targets_mask.sum(dim=-1) >= 1).type(torch.int64) batch_size, number_targets = label_mask.shape # add position embeddings if required. batch_size, context_sequence_length, context_embed_dim = embedded_context.shape batch_size_num_targets = batch_size * number_targets repeated_embeded_context_seq = embedded_context.unsqueeze(1).repeat(1,number_targets,1,1) repeated_embeded_context_seq = repeated_embeded_context_seq.view(batch_size_num_targets, context_sequence_length, context_embed_dim) repeated_embeded_position_context_seq = concat_position_embeddings(repeated_embeded_context_seq, position_embeddings, self.target_position_embedding) repeated_context_mask = context_mask.unsqueeze(1).repeat(1, number_targets, 1) repeated_context_mask = repeated_context_mask.view(batch_size_num_targets, context_sequence_length) # Size (batch size * number_targets, sequence length, embedding dim) repeated_context_seq = self.context_encoder(repeated_embeded_position_context_seq, repeated_context_mask) repeated_context_seq = self._variational_dropout(repeated_context_seq) context_dim = repeated_context_seq.shape[-1] # The if statement determines if True should use contextualized targets if self._use_target_sequences: # Need to make the target_sequences the mask for the embedded contexts # therefore need to make the embedded context the same size by # expanding by the number of targets. batch_size, number_targets, target_sequence_length, target_index_length = target_sequences.shape target_index_len_err = ('The size of the context sequence ' f'{context_sequence_length} is not the same' ' as the target index sequence ' f'{target_index_length}. This is to get ' 'the contextualized target through the context') assert context_sequence_length == target_index_length, target_index_len_err seq_targets_mask = target_sequences.view(batch_size_num_targets, target_sequence_length, target_index_length) embedded_targets = torch.matmul(seq_targets_mask.type(torch.float32), repeated_embeded_context_seq) else: # This is required if the input is of shape greater than 3 dim e.g. # character input where it is # (batch size, number targets, token length, char length) # Embed and encode target as a sequence temp_targets = elmo_input_reshape(targets, batch_size, number_targets, batch_size_num_targets) if self.target_field_embedder: embedded_targets = self.target_field_embedder(temp_targets) else: embedded_targets = self.context_field_embedder(temp_targets) # Size (batch size, num targets, sequence length, embedding dim) embedded_targets = elmo_input_reverse(embedded_targets, targets, batch_size, number_targets, batch_size_num_targets) # Batch size * number targets, target length, dim _, _, target_sequence_length, embeded_target_dim = embedded_targets.shape embedded_targets = embedded_targets.view(batch_size_num_targets, target_sequence_length, embeded_target_dim) embedded_targets = self._variational_dropout(embedded_targets) # Encoding sequences seq_targets_mask = targets_mask.view(batch_size_num_targets, target_sequence_length) seq_encoded_targets_seq = self.target_encoder(embedded_targets, seq_targets_mask) seq_encoded_targets_dim = seq_encoded_targets_seq.shape[-1] # Weighted position information encoded into the context sequence. if self.target_position_weight is not None: if position_weights is None: raise ValueError('This model requires `position_weights` to ' 'better encode the target but none were given') position_output = self.target_position_weight(repeated_context_seq, position_weights, repeated_context_mask) repeated_context_seq, weighted_position_weights = position_output # # Attention layers # # context attention # Get average of the target hidden states as the query vector for the # context attention. # Batch size * number targets, number targets, dim avg_targets_vec = self._target_averager(seq_encoded_targets_seq, seq_targets_mask) # Batch size * number targets, sequence length context_attentions_weights = self.context_attention_layer(avg_targets_vec, repeated_context_seq, repeated_context_mask) # Convert into Batch Size, Number Targets, sequence length context_attentions_weights = context_attentions_weights.view(batch_size, number_targets, context_sequence_length) # Convert into Batch Size, Number Targets, sequence length, dim weighted_encoded_context_seq = repeated_context_seq.view(batch_size, number_targets, context_sequence_length, context_dim) # Batch size, number targets, sequence length, dim context_attentions_weights = context_attentions_weights.unsqueeze(-1) weighted_encoded_context_seq = weighted_encoded_context_seq * context_attentions_weights # Batch size, number targets, dim weighted_encoded_context_vec = weighted_encoded_context_seq.sum(2) # target attention # Get average of the context hidden states as the query vector for the # target attention. # batch size * number targets, target length repeated_avg_context_vec = self._context_averager(repeated_context_seq, repeated_context_mask) target_attention_weights = self.target_attention_layer(repeated_avg_context_vec, seq_encoded_targets_seq, seq_targets_mask) # batch size, number targets, target length target_attention_weights = target_attention_weights.view(batch_size, number_targets, target_sequence_length) target_attention_weights = target_attention_weights.unsqueeze(-1) weighted_encoded_target_seq = seq_encoded_targets_seq.view(batch_size, number_targets, target_sequence_length, seq_encoded_targets_dim) weighted_encoded_target_seq = weighted_encoded_target_seq * target_attention_weights # batch size, number targets, dim weighted_encoded_target_vec = weighted_encoded_target_seq.sum(2) # Concatenate the two weighted context and target vectors weighted_text_target = torch.cat([weighted_encoded_context_vec, weighted_encoded_target_vec], -1) weighted_text_target = self._time_naive_dropout(weighted_text_target) if self.inter_target_encoding is not None: weighted_text_target = self.inter_target_encoding(weighted_text_target, label_mask) weighted_text_target = self._variational_dropout(weighted_text_target) if self.feedforward: weighted_text_target = self.feedforward(weighted_text_target) # Putting it through a tanh first and then a softmax logits = self.label_projection(weighted_text_target) logits = torch.tanh(logits) masked_class_probabilities = util.masked_softmax(logits, label_mask.unsqueeze(-1)) output_dict = {"class_probabilities": masked_class_probabilities, "targets_mask": label_mask} # Convert it to bool tensor. label_mask = label_mask == 1 if target_sentiments is not None: # gets the loss per target instance due to the average=`token` if self.loss_weights is not None: loss = util.sequence_cross_entropy_with_logits(logits, target_sentiments, label_mask, average='token', alpha=self.loss_weights) else: loss = util.sequence_cross_entropy_with_logits(logits, target_sentiments, label_mask, average='token') for metrics in [self.metrics, self.f1_metrics]: for metric in metrics.values(): metric(logits, target_sentiments, label_mask) output_dict["loss"] = loss if metadata is not None: words = [] texts = [] meta_targets = [] target_words = [] for batch_index, sample in enumerate(metadata): words.append(sample['text words']) texts.append(sample['text']) meta_targets.append(sample['targets']) target_words.append(sample['target words']) output_dict["words"] = words output_dict["text"] = texts output_dict["word_attention"] = context_attentions_weights output_dict["targets"] = meta_targets output_dict["target words"] = target_words output_dict["targets_attention"] = target_attention_weights output_dict["context_mask"] = context_mask output_dict["targets_word_mask"] = targets_mask return output_dict
[docs] @overrides def make_output_human_readable(self, output_dict: Dict[str, torch.Tensor] ) -> Dict[str, torch.Tensor]: ''' Adds the predicted label to the output dict, also removes any class probabilities that do not have a target associated which is caused through the batch prediction process and can be removed by using the target mask. Everyting in the dictionary will be of length (batch size * number of targets) where the number of targets is based on the number of targets in each sentence e.g. if the batch has two sentences where the first contian 2 targets and the second 3 targets the number returned will be (2 + 3) 5 target sentiments. ''' batch_target_predictions = output_dict['class_probabilities'].cpu().data.numpy() label_masks = output_dict['targets_mask'].cpu().data.numpy() cpu_context_mask = output_dict["context_mask"].cpu().data.numpy() cpu_context_attentions_weights = output_dict["word_attention"].cpu().data.numpy() cpu_targets_mask = output_dict["targets_word_mask"].cpu().data.numpy() cpu_target_attention_weights = output_dict["targets_attention"].cpu().data.numpy() # Should have the same batch size and max target nubers batch_size = batch_target_predictions.shape[0] max_number_targets = batch_target_predictions.shape[1] assert label_masks.shape[0] == batch_size assert label_masks.shape[1] == max_number_targets _, context_sequence_length = cpu_context_mask.shape _, _, target_sequence_length = cpu_targets_mask.shape sentiments = [] non_masked_class_probabilities = [] word_attention = [] target_attention = [] for batch_index in range(batch_size): # Sentiment and class probabilities target_sentiments = [] target_non_masked_class_probabilities = [] target_predictions = batch_target_predictions[batch_index] label_mask = label_masks[batch_index] # Attention parameters word_attention_batch = [] target_attention_batch = [] relevant_word_mask = cpu_context_mask[batch_index] relevant_target_mask = cpu_targets_mask[batch_index] for target_index in range(max_number_targets): if label_mask[target_index] != 1: continue # Sentiment and class probabilities target_prediction = target_predictions[target_index] label_index = numpy.argmax(target_prediction) label = self.vocab.get_token_from_index(label_index, namespace=self.label_name) target_sentiments.append(label) target_non_masked_class_probabilities.append(target_prediction) # Attention parameters context_target_word_attention = [] for word_index in range(context_sequence_length): if not relevant_word_mask[word_index]: continue context_target_word_attention.append(cpu_context_attentions_weights[batch_index][target_index][word_index][0]) word_attention_batch.append(context_target_word_attention) target_word_attention = [] for target_word_index in range(target_sequence_length): if not relevant_target_mask[target_index][target_word_index]: continue target_word_attention.append(cpu_target_attention_weights[batch_index][target_index][target_word_index][0]) target_attention_batch.append(target_word_attention) word_attention.append(word_attention_batch) target_attention.append(target_attention_batch) sentiments.append(target_sentiments) non_masked_class_probabilities.append(target_non_masked_class_probabilities) output_dict['sentiments'] = sentiments output_dict['class_probabilities'] = non_masked_class_probabilities output_dict['word_attention'] = word_attention output_dict['targets_attention'] = target_attention return output_dict
[docs] def get_metrics(self, reset: bool = False) -> Dict[str, float]: # Other scores metric_name_value = {} for metric_name, metric in self.metrics.items(): metric_name_value[metric_name] = metric.get_metric(reset) # F1 scores all_f1_scores = [] for metric_name, metric in self.f1_metrics.items(): precision, recall, f1_measure = metric.get_metric(reset) all_f1_scores.append(f1_measure) metric_name_value[metric_name] = f1_measure metric_name_value['Macro_F1'] = sum(all_f1_scores) / len(self.f1_metrics) return metric_name_value