from dataclasses import dataclass from typing import Any, Callable, Dict, List, NewType, Optional, Tuple, Union from enum import Enum import torch from torch.utils.data import DataLoader from transformers import AutoTokenizer, BertTokenizer # from transformers.configuration_bert import BertTokenizer, BertTokenizerFast from transformers.tokenization_utils_base import (BatchEncoding, PreTrainedTokenizerBase) from .base_data_module import BaseDataModule from .processor import KGProcessor, get_dataset import transformers transformers.logging.set_verbosity_error() class ExplicitEnum(Enum): """ Enum with more explicit error message for missing values. """ @classmethod def _missing_(cls, value): raise ValueError( f"{value} is not a valid {cls.__name__}, please select one of {list(cls._value2member_map_.keys())}" ) class PaddingStrategy(ExplicitEnum): """ Possible values for the ``padding`` argument in :meth:`PreTrainedTokenizerBase.__call__`. Useful for tab-completion in an IDE. """ LONGEST = "longest" MAX_LENGTH = "max_length" DO_NOT_PAD = "do_not_pad" import numpy as np @dataclass class DataCollatorForSeq2Seq: """ Data collator that will dynamically pad the inputs received, as well as the labels. Args: tokenizer (:class:`~transformers.PreTrainedTokenizer` or :class:`~transformers.PreTrainedTokenizerFast`): The tokenizer used for encoding the data. model (:class:`~transformers.PreTrainedModel`): The model that is being trained. If set and has the `prepare_decoder_input_ids_from_labels`, use it to prepare the `decoder_input_ids` This is useful when using `label_smoothing` to avoid calculating loss twice. padding (:obj:`bool`, :obj:`str` or :class:`~transformers.file_utils.PaddingStrategy`, `optional`, defaults to :obj:`True`): Select a strategy to pad the returned sequences (according to the model's padding side and padding index) among: * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single sequence is provided). * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the maximum acceptable input length for the model if that argument is not provided. * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of different lengths). max_length (:obj:`int`, `optional`): Maximum length of the returned list and optionally padding length (see above). pad_to_multiple_of (:obj:`int`, `optional`): If set will pad the sequence to a multiple of the provided value. This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >= 7.5 (Volta). label_pad_token_id (:obj:`int`, `optional`, defaults to -100): The id to use when padding the labels (-100 will be automatically ignored by PyTorch loss functions). """ tokenizer: PreTrainedTokenizerBase model: Optional[Any] = None padding: Union[bool, str, PaddingStrategy] = True max_length: Optional[int] = None pad_to_multiple_of: Optional[int] = None label_pad_token_id: int = -100 return_tensors: str = "pt" num_labels: int = 0 def __call__(self, features, return_tensors=None): if return_tensors is None: return_tensors = self.return_tensors labels = [feature.pop("labels") for feature in features] if "labels" in features[0].keys() else None label = [feature.pop("label") for feature in features] features_keys = {} name_keys = list(features[0].keys()) for k in name_keys: # ignore the padding arguments if k in ["input_ids", "attention_mask", "token_type_ids"]: continue try: features_keys[k] = [feature.pop(k) for feature in features] except KeyError: continue # features_keys[k] = [feature.pop(k) for feature in features] # We have to pad the labels before calling `tokenizer.pad` as this method won't pad them and needs them of the # same length to return tensors. bsz = len(labels) with torch.no_grad(): new_labels = torch.zeros(bsz, self.num_labels) for i,l in enumerate(labels): if isinstance(l, int): new_labels[i][l] = 1 else: for j in l: new_labels[i][j] = 1 labels = new_labels features = self.tokenizer.pad( features, padding=self.padding, max_length=self.max_length, pad_to_multiple_of=self.pad_to_multiple_of, return_tensors=return_tensors, ) features['labels'] = labels features['label'] = torch.tensor(label) features.update(features_keys) return features class KGC(BaseDataModule): def __init__(self, args, model) -> None: super().__init__(args) self.tokenizer = AutoTokenizer.from_pretrained(self.args.model_name_or_path, use_fast=False) self.processor = KGProcessor(self.tokenizer, args) self.label_list = self.processor.get_labels(args.data_dir) entity_list = self.processor.get_entities(args.data_dir) num_added_tokens = self.tokenizer.add_special_tokens({'additional_special_tokens': entity_list}) self.sampler = DataCollatorForSeq2Seq(self.tokenizer, model=model, label_pad_token_id=self.tokenizer.pad_token_id, pad_to_multiple_of=8 if self.args.precision == 16 else None, padding="longest", max_length=self.args.max_seq_length, num_labels = len(entity_list), ) relations_tokens = self.processor.get_relations(args.data_dir) self.num_relations = len(relations_tokens) num_added_tokens = self.tokenizer.add_special_tokens({'additional_special_tokens': relations_tokens}) vocab = self.tokenizer.get_added_vocab() self.relation_id_st = vocab[relations_tokens[0]] self.relation_id_ed = vocab[relations_tokens[-1]] + 1 self.entity_id_st = vocab[entity_list[0]] self.entity_id_ed = vocab[entity_list[-1]] + 1 def setup(self, stage=None): self.data_train = get_dataset(self.args, self.processor, self.label_list, self.tokenizer, "train") self.data_val = get_dataset(self.args, self.processor, self.label_list, self.tokenizer, "dev") self.data_test = get_dataset(self.args, self.processor, self.label_list, self.tokenizer, "test") def prepare_data(self): pass def get_config(self): d = {} for k, v in self.__dict__.items(): if "st" in k or "ed" in k: d.update({k:v}) return d @staticmethod def add_to_argparse(parser): BaseDataModule.add_to_argparse(parser) parser.add_argument("--model_name_or_path", type=str, default="roberta-base", help="the name or the path to the pretrained model") parser.add_argument("--data_dir", type=str, default="roberta-base", help="the name or the path to the pretrained model") parser.add_argument("--max_seq_length", type=int, default=256, help="Number of examples to operate on per forward step.") parser.add_argument("--warm_up_radio", type=float, default=0.1, help="Number of examples to operate on per forward step.") parser.add_argument("--eval_batch_size", type=int, default=8) parser.add_argument("--overwrite_cache", action="store_true", default=False) return parser def get_tokenizer(self): return self.tokenizer def train_dataloader(self): return DataLoader(self.data_train, num_workers=self.num_workers, pin_memory=True, collate_fn=self.sampler, batch_size=self.args.batch_size, shuffle=not self.args.faiss_init) def val_dataloader(self): return DataLoader(self.data_val, num_workers=self.num_workers, pin_memory=True, collate_fn=self.sampler, batch_size=self.args.eval_batch_size) def test_dataloader(self): return DataLoader(self.data_test, num_workers=self.num_workers, pin_memory=True, collate_fn=self.sampler, batch_size=self.args.eval_batch_size)