【算法】局部敏感哈希 LSH 的 Python 实现

您所在的位置:网站首页 sivic思域10 【算法】局部敏感哈希 LSH 的 Python 实现

【算法】局部敏感哈希 LSH 的 Python 实现

2023-12-05 22:15| 来源: 网络整理| 查看: 265

一、哈希算法

普通的哈希算法:把任意长度的输入通过散列算法变换成固定长度的输出,该输出就是散列值。

最理想的是所有不同的输入都可以映射到散列值,但是存在这种可能性的。当不同的输入映射到相同的散列值时,就称为碰撞冲突。

哈希算法是要避免碰撞冲突,而局部敏感哈希(Locality-Sensitive Hashing, 下面我们简称LSH)则相反,是要创造更多的碰撞冲突。

二、局部敏感哈希

在很多领域中,经常会使用最近邻查找,例如人脸匹配、指纹匹配等,输入一个人脸数据(一般对应一个向量),然后跟数据库中的所有人脸进行比对,找出最接近的一个人脸。

最简单粗暴的实现就是线性查找匹配,即与所有数据逐一比较,留下最相似的。显而易见,这种方法极其耗时,存在很大的性能问题,特别是数据量和向量维度特别大的情况。

或者,为了加快查询速度,需要采用索引的方法,例如KD-Tree。

除此之外,就是我们这篇文章的主角:LSH。

它的本质还是哈希算法,但不同的时,针对于相似的输入,我们要提高碰撞冲突的概率,而对于差异很大的向量,则需要降低碰撞冲突的概率。

所以LSH的哈希函数(散列算法)要有这样的特性:相似的输入经过特定的哈希函数映射之后的哈希值,它们很大概率是相同的;反之,当差异很大的输入经过哈希函数映射后的哈希值相同的概率很小。

请添加图片描述

方法步骤:

在离线状态下,将所有的向量通过特定的哈希函数映射到对应的索引位置;输入一个向量,用同样的哈希函数计算哈希值,找到对应哈希值位置的所有向量;(比如有10个不同的哈希函数,就计算10次)根据对应的距离度量方法,与第2步查到的所有向量计算距离;筛选出距离最小的n个向量,即为与输入向量最为相似的n个结果。 三、Python实现LSH import numpy as np from typing import List, Union class EuclideanLSH: def __init__(self, tables_num: int, a: int, depth: int): """ :param tables_num: hash_table的个数 :param a: a越大,被纳入同个位置的向量就越多,即可以提高原来相似的向量映射到同个位置的概率, 反之,则可以降低原来不相似的向量映射到同个位置的概率。 :param depth: 向量的维度数 """ self.tables_num = tables_num self.a = a # 为了方便矩阵运算,调整了shape,每一列代表一个hash_table的随机向量 self.R = np.random.random([depth, tables_num]) self.b = np.random.uniform(0, a, [1, tables_num]) # 初始化空的hash_table self.hash_tables = [dict() for i in range(tables_num)] def _hash(self, inputs: Union[List[List], np.ndarray]): """ 将向量映射到对应的hash_table的索引 :param inputs: 输入的单个或多个向量 :return: 每一行代表一个向量输出的所有索引,每一列代表位于一个hash_table中的索引 """ # H(V) = |V·R + b| / a,R是一个随机向量,a是桶宽,b是一个在[0,a]之间均匀分布的随机变量 hash_val = np.floor(np.abs(np.matmul(inputs, self.R) + self.b) / self.a) return hash_val def insert(self, inputs): """ 将向量映射到对应的hash_table的索引,并插入到所有hash_table中 :param inputs: :return: """ # 将inputs转化为二维向量 inputs = np.array(inputs) if len(inputs.shape) == 1: inputs = inputs.reshape([1, -1]) hash_index = self._hash(inputs) for inputs_one, indexs in zip(inputs, hash_index): for i, key in enumerate(indexs): # i代表第i个hash_table,key则为当前hash_table的索引位置 # inputs_one代表当前向量 self.hash_tables[i].setdefault(key, []).append(tuple(inputs_one)) def query(self, inputs, nums=20): """ 查询与inputs相似的向量,并输出相似度最高的nums个 :param inputs: 输入向量 :param nums: :return: """ hash_val = self._hash(inputs).ravel() candidates = set() # 将相同索引位置的向量添加到候选集中 for i, key in enumerate(hash_val): candidates.update(self.hash_tables[i][key]) # 根据向量距离进行排序 candidates = sorted(candidates, key=lambda x: self.euclidean_dis(x, inputs)) return candidates[:nums] @staticmethod def euclidean_dis(x, y): """ 计算欧式距离 :param x: :param y: :return: """ x = np.array(x) y = np.array(y) return np.sqrt(np.sum(np.power(x - y, 2))) if __name__ == '__main__': data = np.random.random([10000, 100]) query = np.random.random([100]) lsh = EuclideanLSH(10, 1, 100) lsh.insert(data) res = lsh.query(query, 20) res = np.array(res) print(np.sum(np.power(res - query, 2), axis=-1)) sort = np.argsort(np.sum(np.power(data - query, 2), axis=-1)) print(np.sum(np.power(data[sort[:20]] - query, 2), axis=-1)) print(np.sum(np.power(data[sort[-20:]] - query, 2), axis=-1))

请添加图片描述

四、使用第三方库datasketch实现LSH 4.1 官方示例 from datasketch import MinHash, MinHashLSH set1 = set(['minhash', 'is', 'a', 'probabilistic', 'data', 'structure', 'for', 'estimating', 'the', 'similarity', 'between', 'datasets']) set2 = set(['minhash', 'is', 'a', 'probability', 'data', 'structure', 'for', 'estimating', 'the', 'similarity', 'between', 'documents']) set3 = set(['minhash', 'is', 'probability', 'data', 'structure', 'for', 'estimating', 'the', 'similarity', 'between', 'documents']) m1 = MinHash(num_perm=128) m2 = MinHash(num_perm=128) m3 = MinHash(num_perm=128) for d in set1: m1.update(d.encode('utf8')) for d in set2: m2.update(d.encode('utf8')) for d in set3: m3.update(d.encode('utf8')) # Create LSH index lsh = MinHashLSH(threshold=0.5, num_perm=128) lsh.insert("m2", m2) lsh.insert("m3", m3) result = lsh.query(m1) print("Approximate neighbours with Jaccard similarity > 0.5", result) 4.2 LSH算法 import numpy as np from datasketch import WeightedMinHashGenerator from datasketch import MinHashLSH from tqdm import tqdm all_data = np.random.random([10000, 100]) query = np.random.random([100]) mg = WeightedMinHashGenerator(all_data.shape[1]) lsh = MinHashLSH(threshold=0.7) for index, value in tqdm(enumerate(all_data)): m_hash = mg.minhash(value) lsh.insert(index, m_hash) result = lsh.query(mg.minhash(query)) print(result) # 开始验证 print(np.sum(np.power(all_data[result] - query, 2), axis=-1)) # 计算LSH的结果与query的结果的差距 total_data = np.concatenate((all_data, [query])) sort = np.argsort(np.sum(np.power(total_data - query, 2), axis=-1)) # 线性查找真正的最接近的曲线 print(np.sum(np.power(total_data[sort[:20]] - query, 2), axis=-1)) # 计算最接近的曲线 print(np.sum(np.power(total_data[sort[-20:]] - query, 2), axis=-1)) 4.3 MinHashLSHForest

MinHashLSHForest可以选择Top K的内容

import numpy as np from datasketch import WeightedMinHashGenerator from datasketch import MinHashLSHForest from tqdm import tqdm all_data = np.random.random([10000, 100]) query = np.random.random([100]) mg = WeightedMinHashGenerator(all_data.shape[1]) forest = MinHashLSHForest() for index, value in tqdm(enumerate(all_data)): m_hash = mg.minhash(value) forest.add(index, m_hash) forest.index() # 重要!在此之后才可以使用查询功能 result = forest.query(mg.minhash(query), 20) # 选择top20 print(result) # 开始验证 print(np.sum(np.power(all_data[result] - query, 2), axis=-1)) # 计算LSH的结果与query的结果的差距 total_data = np.concatenate((all_data, [query])) sort = np.argsort(np.sum(np.power(total_data - query, 2), axis=-1)) # 线性查找真正的最接近的曲线 print(np.sum(np.power(total_data[sort[:20]] - query, 2), axis=-1)) # 计算最接近的曲线 print(np.sum(np.power(total_data[sort[-20:]] - query, 2), axis=-1)) 参考资料 局部敏感哈希(LSH):高维数据下的最近邻查找python使用局部敏感性哈希算法,在海量数据中查询相似序列局部敏感哈希 python实现_局部敏感哈希(LSH)


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3