0

我想将 LSH 类更改为分类器,在该分类器中我可以使用 fit 方法和 predict 方法,以便在处理时间方面将其与其他最近邻方法进行比较。这是代码:

从副本导入副本从 itertools 导入组合导入 numpy 作为 np 从熊猫导入 DataFrame 从 sklearn.metrics.pairwise 导入 pairwise_distances

LSH 类:def init (self, data): self.data = data self.model = None

def __generate_random_vectors(self, num_vector, dim):
    return np.random.randn(dim, num_vector)

def train(self, num_vector, seed=None):
    dim = self.data.shape[1]
    if seed is not None:
        np.random.seed(seed)

    random_vectors = self.__generate_random_vectors(num_vector, dim)
    powers_of_two = 1 << np.arange(num_vector - 1, -1, -1)

    table = {}

    # Partition data points into bins
    bin_index_bits = (self.data.dot(random_vectors) >= 0)

    # Encode bin index bits into integers
    bin_indices = bin_index_bits.dot(powers_of_two)

    # Update `table` so that `table[i]` is the list of document ids with bin index equal to i.
    for data_index, bin_index in enumerate(bin_indices):
        if bin_index not in table:
            # If no list yet exists for this bin, assign the bin an empty list.
            table[bin_index] = []
        # Fetch the list of document ids associated with the bin and add the document id to the end.
        table[bin_index].append(data_index)

    self.model = {'bin_indices': bin_indices, 'table': table,
                  'random_vectors': random_vectors, 'num_vector': num_vector}
    return self

def __search_nearby_bins(self, query_bin_bits, table, search_radius=2, initial_candidates=set()):
    num_vector = self.model['num_vector']
    powers_of_two = 1 << np.arange(num_vector - 1, -1, -1)

    # Allow the user to provide an initial set of candidates.
    candidate_set = copy(initial_candidates)

    for different_bits in combinations(range(num_vector), search_radius):
        alternate_bits = copy(query_bin_bits)
        for i in different_bits:
            alternate_bits[i] = 1 if alternate_bits[i] == 0 else 0

        # Convert the new bit vector to an integer index
        nearby_bin = alternate_bits.dot(powers_of_two)

        # Fetch the list of documents belonging to the bin indexed by the new bit vector.
        # Then add those documents to candidate_set
        if nearby_bin in table:
            candidate_set.update(table[nearby_bin])

    return candidate_set

def query(self, query_vec, k, max_search_radius, initial_candidates=set()):

    if not self.model:
        print('Model not yet build. Exiting!')
        exit(-1)

    data = self.data
    table = self.model['table']
    random_vectors = self.model['random_vectors']

    bin_index_bits = (query_vec.dot(random_vectors) >= 0).flatten()

    candidate_set = set()
    # Search nearby bins and collect candidates
    for search_radius in xrange(max_search_radius + 1):
        candidate_set = self.__search_nearby_bins(bin_index_bits, table,
                                                  search_radius, initial_candidates=initial_candidates)
    # Sort candidates by their true distances from the query
    nearest_neighbors = DataFrame({'id': list(candidate_set)})
    candidates = data[np.array(list(candidate_set)), :]
    nearest_neighbors['distance'] = pairwise_distances(candidates, query_vec, metric='cosine').flatten()

    return nearest_neighbors.nsmallest(k, 'distance')
4

0 回答 0