Source code for botiverse.Theorizer.squad.sample_data

from typing import List, Tuple, Dict, Any
from collections import Counter
from dataclasses import dataclass
import os

import numpy as np
from .utils import *
from .info_extractor import *
import pickle as pkl

current_file_dir = os.path.dirname(os.path.abspath(__file__))

[docs]@dataclass class AnswerSample: answer_text: str char_st: int char_ed: int st: int ed: int answer_bio_ids: List[str] answer_pos_tag: str answer_ner_tag: str
[docs]@dataclass class ClueSample: clue_text: str clue_binary_ids: np.ndarray
[docs]def select_answers(chunklist, sentence, sample_probs, config=InfoConfig()) -> List[AnswerSample]: """ Select multiple answer chunks from a given list of chunks based on their probability. Args: chunklist (list): A list of chunks, where each chunk is a tuple containing NER tag, POS tag, token leaves, start index, and end index. sentence (str): The input sentence from which the chunks are extracted. sample_probs (dict): A dictionary containing the probabilities of different answer conditions. config (InfoConfig, optional): A configuration object containing parameters for the sampling process. Returns: list: A list of sampled answers, where each answer is a tuple containing answer text, character start index, character end index, token start index, token end index, answer BIO tags, POS tag, and NER tag. """ token2idx, idx2token = token_to_char_indices(sentence) # I can write haskell, can you? # chunk === chunk_ner_tag, chunk_pos_tag, leaves_without_position, st, ed chunk: Tuple[str, str, List[str], int, int] a_probs = [ sample_probs["a"] ["_".join(["-".join([chunk[0], chunk[1]]), str(value_to_bin( abs(chunk[3] - chunk[4] + 1), config.ans_len_min_val, config.ans_len_max_val, config.ans_len_bin_width ))])] if chunk[2][0].lower() not in NOT_BEGIN_TOKENS_FOR_ANSWER_CLUE else 1 for chunk in chunklist ] sampled_chunk_ids = set( weighted_sample(list(range(len(chunklist))), a_probs) for _ in range(config.max_sample_times) ) sentence_spacydoc = NLP(sentence) def process_chunk(chunk): chunk_ner_tag, chunk_pos_tag, leaves, st, ed = chunk context = sentence char_st, char_ed = str_find(context, leaves) if char_st < 0: return None answer_text = context[char_st: char_ed + 1] st, ed = idx2token[char_st], idx2token[char_ed] answer_bio_ids = ["O"] * len(sentence_spacydoc) answer_bio_ids[st: ed + 1] = ["I"] * (ed - st + 1) answer_bio_ids[st] = "B" char_st, char_ed = token2idx[st][0], token2idx[ed][1] return AnswerSample( answer_text=answer_text, char_st=char_st, char_ed=char_ed, st=st, ed=ed, answer_bio_ids=answer_bio_ids, answer_pos_tag=chunk_pos_tag, answer_ner_tag=chunk_ner_tag, ) sampled_answers = [ answer for answer in (process_chunk(chunklist[chunk_id]) for chunk_id in sampled_chunk_ids) if answer ] return sampled_answers[:config.num_sample_answer]
[docs]def select_questions(ans: AnswerSample, sample_probs, config=InfoConfig()): """ Select question styles based on the answer's POS and NER tags, given sample probabilities. Args: ans (AnswerSample): A tuple containing information about the answer, including its text, indices, BIO tags, POS tag, and NER tag. sample_probs (dict): A dictionary containing the probabilities of different question styles. config (InfoConfig, optional): A configuration object containing the maximum number of sampling attempts and the desired number of question styles to sample. Returns: list: A list of sampled question styles. """ a_tag = "-".join([ans.answer_pos_tag, ans.answer_ner_tag]) # Get style probabilities s_probs = [ sample_probs["s|c,a"].get("_".join([s, a_tag]), 1) for s in QUESTION_TYPES ] # Sample question styles sampled_styles = [] for _ in range(config.max_sample_times): sampled_s = weighted_sample(QUESTION_TYPES, s_probs) if sampled_s not in sampled_styles: sampled_styles.append(sampled_s) if len(sampled_styles) >= config.num_sample_style: break return sampled_styles
[docs]def select_clues(chunklist, doc: SpacyDoc, ans: AnswerSample, sample_probs, config=InfoConfig()): """ Select clues from a list of chunks based on the dependency distance and the probability of the chunk given the answer. Args: chunklist (list): A list of chunks, each containing NER tag, POS tag, text, start index, and end index. doc (spacy.tokens.Doc): A SpaCy document containing the tokens of the sentence. ans (AnswerSample): A tuple containing information about the answer, including its text, indices, BIO tags, POS tag, and NER tag. config (InfoConfig, optional): A configuration object containing the maximum number of sampling attempts and the desired number of clues to sample. Returns: list: A list of sampled clues, with each clue containing its text and binary ids. """ st, idx2related = ans.st, get_dependency_paths(doc)[1] context_tokens = [token.text for token in doc] # Calculate chunk probabilities c_probs = [] for chunk in chunklist: c_tag = "-".join([chunk[1], chunk[0]]) dep_dist = min(abs(chunk[3] - st), min(len(path) for tk_id, path in idx2related[st] if tk_id == chunk[3])) dep_dist_bin = value_to_bin(dep_dist, config.clue_dep_dist_min_val, config.clue_dep_dist_max_val, config.clue_dep_dist_bin_width) if chunk[2][0].lower() not in NOT_BEGIN_TOKENS_FOR_ANSWER_CLUE: c_probs.append(sample_probs["c|a"].get( "_".join([c_tag, str(dep_dist_bin)]), 1)) else: c_probs.append(1) # Sample clues chunk_ids = list(range(len(chunklist))) sampled_clue_chunk_ids = [] for _ in range(config.max_sample_times): sampled_chunk_id = weighted_sample(chunk_ids, c_probs) if sampled_chunk_id not in sampled_clue_chunk_ids: sampled_clue_chunk_ids.append(sampled_chunk_id) if len(sampled_clue_chunk_ids) >= config.num_sample_clue: break # Extract clue details sampled_clues = [] for chunk_id in sampled_clue_chunk_ids: chunk = chunklist[chunk_id] clue_start, clue_end = chunk[3], chunk[4] clue_text = " ".join(context_tokens[clue_start: clue_end + 1]) clue_binary_ids = [0] * len(doc) clue_binary_ids[clue_start: clue_end + 1] = [1] * \ (clue_end - clue_start + 1) sampled_clues.append(ClueSample(clue_text=clue_text, clue_binary_ids=clue_binary_ids)) return sampled_clues
[docs]def select(sentence, sample_probs, config=InfoConfig()): sampled_infos = [] chunklist = chunks(sentence) doc = NLP(sentence) for ans in select_answers(chunklist, sentence, sample_probs): info = { "answer": { "answer_text": ans.answer_text, "char_start": ans.char_st, "char_end": ans.char_ed, "answer_bio_ids": ans.answer_bio_ids, "answer_chunk_tag": ans.answer_pos_tag, }, "styles": None, "clues": None, } # sample styleselect styles = select_questions(ans, sample_probs) info["styles"] = list(styles) # sample clue selected_clues = select_clues(chunklist, doc, ans, sample_probs) info["clues"] = selected_clues sampled_infos.append(info) result = { "context": sentence, "selected_infos": sampled_infos, "ans_sent_doc": doc.text, } return result
[docs]def read_sample_probs(sample_probs_path): with open(sample_probs_path, "rb") as f: sample_probs = pkl.load(f) return sample_probs
[docs]def select_with_default_sampel_probs(sentence): sample_probs_path = os.path.join(current_file_dir,"sample_probs.pkl") sample_probs = read_sample_probs(sample_probs_path) selection = select( "Bob is eating a delicious cake in Vancouver.", sample_probs) return selection
[docs]def test(): sample_probs_path = "botiverse/Theorizer/squad/sample_probs.pkl" sample_probs = read_sample_probs(sample_probs_path) selection = select( "Bob is eating a delicious cake in Vancouver.", sample_probs) print(selection)
if __name__ == "__main__": test()