0
点赞
收藏
分享

微信扫一扫

word2vector训练数据集整理(代码实现)

import math
import os
import random
import torch
import dltools
from matplotlib import pyplot as plt
#读取数据集
def read_ptb():
    """将PTB数据集加载到文本行的列表中"""
    with open('./ptb/ptb.train.txt') as f:
        raw_text = f.read()
    return [line.split() for line in raw_text.split('\n')]

sentences = read_ptb()
print(f'# sentences数:{len(sentences)}')
#构建词表,并把频次低于10的词元替换为<unk>
vocab = dltools.Vocab(sentences, min_freq=10)
print(f'# vocab_size: {len(vocab)}')
#向下采样
def subsample(sentences, vocab):
    #排除未知词元‘<unk>’,对sentences进行处理
    sentences = [[token for token in line if vocab[token] != vocab.unk] for line in sentences]
    #对排除unk的sentences进行tokens计数  (未去重)
    counter = dltools.count_corpus(sentences)
    #聚合
    num_tokens = sum(counter.values())
    
    #若在下采样期间保留词元, 则返回True    
    def keep(token):
        return (random.uniform(0, 1) < math.sqrt(1e-4 / (counter[token] / num_tokens)))
    
    #降低冠词等无意义词的频次,  词频低越容易保留
    return ([[token for token in line if keep(token)] for line in sentences], counter)  

subsampled, counter = subsample(sentences, vocab)
#画出下采样之后的图, 采取下采样前后的20条数据
before = [len(x) for x in sentences[:20]]
after = [len(x) for x in subsampled[:20]]
x = range(len(before))
plt.bar(x, height=before, width=0.4, alpha=0.8, color='red', label='before')
#[i + 0.4 for i in x] 是X轴刻度
plt.bar([i + 0.4 for i in x], height=after, width=0.4, color='green', label='after')
plt.xlabel('tokens per sentences')
plt.ylabel('count')
plt.legend(['before', 'after'])
plt.show()

def compare_counts(token):
    return (f'"{token}"的数量:' f'之前={sum([l.count(token) for l in sentences])}, ' 
            f'之后={sum([l.count(token) for l in subsampled])}')

compare_counts('the')
compare_counts('publishing')
#将词元映射到他们在语料库中的索引
corpus = [vocab[line] for line in subsampled]
corpus[:3]
#中心词和上下文词的提取
def get_centers_and_contetxs(corpus, max_window_size):
    """返回skip_gram模型中的中心词和上下文词"""
    centers, contexts = [], []
    for line in corpus:
        #要形成“中心词——上下文词对”, 每个句子至少需要有2个词
        if len(line) < 2:
            continue
        centers += line #把满足条件的line放于中心词列表中
        for idx, i in enumerate(range(len(line))):   #上下文窗口的中间token的索引为i
            window_size = random.randint(1, max_window_size)
            print('中心词 {} 的窗口大小:{}'.format(idx, window_size))
            indices = list(range(max(0, i - window_size), min(len(line), i + window_size + 1)))
            #从上下文词中排除中心词
            indices.remove(i)
            contexts.append([line[x] for x in indices])
    return centers, contexts
#假设数据
tiny_dataset = [list(range(7)), list(range(7,10))]
print('数据集', tiny_dataset)
#表示解压函数,用于将打包的元组解压回原来的序列
for center, context in zip(*get_centers_and_contetxs(tiny_dataset, 2)):
    print('中心词:',center, '的上下文词是:', context)
#在PTB上进行中心词和背景词提取
#max_window_size=5  业界常用到的数值,效果比较好
all_centers, all_contexts = get_centers_and_contetxs(corpus, 5)
'“中心词-上下文词对”的数量:{}'.format( sum([len(contexts) for contexts in all_contexts]))
#负采样_按权重抽取
class RandomGenerator:
    """根据n个采样权重在{1,2,,3,...n}中随机抽取"""
    def __init__(self, sampling_weights):
        #Exclude 排除
        self.population = list(range(1, len(sampling_weights) + 1))  #对采样数据的编号
        self.sampling_weights = sampling_weights
        self.candidates = []  #采样结果
        self.i = 0
        
    def draw(self):
        if self.i == len(self.candidates):
            #缓存k个随机采样的结果    # population:集群。 weights:相对权重。 cum_weights:累加权重。 k:选取次数
            self.candidates = random.choices(self.population, self.sampling_weights, k=10000)  #k最大值=10000(采样数量)
            self.i = 0
        self.i += 1
        return self.candidates[self.i - 1]
#假设数据验证
generator = RandomGenerator([2, 3, 4])
[generator.draw() for _ in range(10)]
#返回负采样中的噪声词
def get_negatives(all_contetxs, vocab, counter, K):
    #索引为1,2,....(索引0是此表中排除的未知标记)
    sampling_weights = [counter[vocab.to_tokens(i)]**0.75 for i in range(1, len(vocab))]
    all_negatives, generator = [], RandomGenerator(sampling_weights)
    for contexts in all_contetxs:  #遍历背景词
        negatives = []
        while len(negatives) < len(contexts) * K:
            neg = generator.draw()
            #噪声词不能是上下文词
            if neg not in contexts:
                negatives.append(neg)
        all_negatives.append(negatives)
    return all_negatives

all_negatives = get_negatives(all_contexts, vocab, counter, 5)
        
# 小批量操作
def batchify(data):
    """返回带有负采样的跳元模型的小批量样本"""
    max_len = max(len(c) + len(n) for _, c, n in data)
    centers, contexts_negatives, masks, labels = [], [], [], []
    for center, context, negative in data:
        cur_len = len(context) + len(negative)
        centers += [center]
        contexts_negatives += \
            [context + negative + [0] * (max_len - cur_len)]
        masks += [[1] * cur_len + [0] * (max_len - cur_len)]
        labels += [[1] * len(context) + [0] * (max_len - len(context))]
    return (torch.tensor(centers).reshape((-1, 1)), torch.tensor(
        contexts_negatives), torch.tensor(masks), torch.tensor(labels))
#小批量的例子
x_1 = (1, [2, 2], [3, 3, 3, 3])
x_2 = (1, [2, 2, 2], [3, 3])
batch = batchify((x_1, x_2))

names = ['centers', 'contexts_negative', 'masks', 'labels']
for name, data in zip(names, batch):
    print(name, '=', data)
#整合后的数据加载处理模块
def load_data_ptb(batch_size, max_window_size, num_noise_words):
    """下载PTB数据集, 然后将其加载到内存中"""
    #加载PTB数据集
    sentences = read_ptb()
    #获取词汇表
    vocab = dltools.Vocab(sentences, min_freq=10)
    #下采样
    subsampled, counter = subsample(sentences, vocab)
    #语料库
    corpus = [vocab[line] for line in subsampled]
    #获取中心词与背景词
    all_centers, all_contexts = get_centers_and_contetxs(corpus, max_window_size)
    #获取噪声词
    get_negatives(all_contetxs, vocab, counter, num_noise_words)
    
    class PTBDataset(torch.utils.data.Dataset):
        def __init__(self, centers, contexts, negatives):
            assert len(centers) == len(contexts) == len(negatives)
            self.centers = centers
            self.contexts = contexts
            self.negatives = negatives
            
        def __getitem__(self, index):
            return (self.centers[index], self.contexts[index],
                    self.negatives[index])
        
        def __len__(self):
            return len(self.centers)

        
    dataset = PTBDataset(all_centers, all_contexts, all_negatives)
    
    data_iter = torch.utils.data.DataLoader(dataset, batch_size, shuffle=True, collate_fn = batchify)
    return data_iter, vocab
data_iter, vocab = load_data_ptb(5, 5, 5)
for batch in data_iter:
    for name, data in zip(names, batch):
        print(name, 'shape:', data.shape)
    break
batch

举报

相关推荐

0 条评论