21

我正在寻找一种有效的 Pythonic 方式将多个过滤器应用于元组列表。

例如,假设过滤器是这样的:

def f1(t): return t[3]<10
def f2(t): return t[0]!=1
def f3(t): return t[1] in ("lisa","eric")
def f4(t): return t[3]>2

和这样的 n 元组(即 db-records):

tuples=[
(0,'tom','...',8),
(1,'john','...',17),
(2,'lisa','...',1),
(3,'eric','...',18)
]

以下作品:

def nFilter(filters,tuples):
    if filters and tuples:
        return nFilter(filters,filter(filters.pop(),tuples))
    else: return tuples

结果如下:

>>> nFilter([f1,f2,f3],tuples)
[(2, 'lisa', '...', 1)]

>>> nFilter([f1,f2,f3,f4],tuples)
[]

但我想知道是否有更直接的方法;我想到的是函数组合(即f1(f2(...fn(tuples)...))),用于任意函数列表。在文档中有对包含函数的函数库的引用compose,但链接都已失效。

此外,由于我计划在相当大的数据集上使用它,并且可能在生产 Web 服务中使用大量过滤器,它必须是高效的,我不能说这个解决方案是否有效。

欢迎任何建议或改进。

4

7 回答 7

32

改进:用迭代代替递归

没有真正的“任意函数列表的组合函数”;但是,使用简单的 for 循环构建过滤器链非常容易:

def nFilter(filters, tuples):
    for f in filters:
        tuples = filter(f, tuples)
    return tuples

改进:按限制和速度排序过滤器

链式迭代器是如此之快,以至于总运行时间往往会被对谓词函数的调用所支配。

最好的结果可以通过对谓词进行排序以最小化总工作来获得。一般来说,最好将廉价的测试放在昂贵的测试之前,并将限制性更强的测试放在不能过滤掉很多情况的测试之前。

例子

在此示例中,谓词的成本大致相同(函数调用、元组索引和与常数的比较),但它们的限制性不同(t[2]==4过滤掉 80% 的情况,而t[0]>1and t[1]<3each 只过滤掉 40 % 的数据)。

>>> from itertools import product

>>> filters = [lambda t: t[2]==4, lambda t: t[0]>1, lambda t: t[1]<3]
>>> for tup in nFilter(filters, product(range(5), repeat=3)):
        print(tup)

(2, 0, 4)
(2, 1, 4)
(2, 2, 4)
(3, 0, 4)
(3, 1, 4)
(3, 2, 4)
(4, 0, 4)
(4, 1, 4)
(4, 2, 4)

从评论中提升的注释

  • 当输入迭代为空时,过滤器函数对谓词进行零应用。这就像在一个空列表上做一个 for 循环。

  • 每个过滤器都会减少输入封闭过滤器的数据量。因此,每个过滤器仅应用于已通过先前过滤器的数据。

  • 不要担心lambda示例中的 。它具有与常规相同的功能def。这只是编写过滤器列表的一种便捷方式。

  • 在 Python 3 中,filter()函数被更新为返回一个迭代器而不是一个列表。在 Python 2 中,您可以使用itertools.ifilter()而不是filter()来实现相同的效果。

于 2012-09-12T10:43:23.813 回答
27

你在寻找这样的东西吗?

filters = (f1,f2,f3,f4)
filtered_list = filter( lambda x: all(f(x) for f in filters), your_list )

这样做的好处是,一旦单个过滤器返回False,该列表元素就不会被包含在内。

于 2012-09-12T10:43:52.390 回答
6

生成器表达式似乎是最惯用的方法(你可以免费获得懒惰):

def nFilter(filters, tuples):
    return (t for t in tuples if all(f(t) for f in filters))

或等效的(并且可以说更具可读性):

def nFilter(filters, tuples):
    for tuple in tuples:
        if all(filter(tuple) for filter in filters):
            yield tuple
于 2012-09-12T11:11:37.363 回答
6

好吧,这里没有花哨的 itertools 或类似的东西,只是使用简单的循环避免递归和生成器的开销:

def for_loop(filters, tuples):
    for f in filters:
        tuples = filter(f, tuples)
        if not tuples: 
            return tuples
    return tuples

这是一个有点脏的基准:

import datetime
from itertools import ifilter
from timeit import Timer

def f1(t): return t[3]<10
def f2(t): return t[0]!=1
def f3(t): return t[1] in ("lisa","eric")
def f4(t): return t[3]>2

def original(filters,tuples):
    if filters and tuples:
        return original(filters,filter(filters.pop(),tuples))
    else: 
        return tuples

def filter_lambda_all(filters, tuples):
    return filter(lambda t: all(f(t) for f in filters), tuples)

def loop(filters, tuples):
    while filters and tuples:
        f = filters[0]
        del filters[0]
        tuples = filter(f, tuples)
    return tuples

def pop_loop(filters, tuples):
    while filters and tuples:
        tuples = filter(filters.pop(), tuples)
    return tuples

def for_loop(filters, tuples):
    for f in filters:
        tuples = filter(f, tuples)
        if not tuples: 
            return tuples
    return tuples


def with_ifilter(filters, tuples):
    for f in filters:
        tuples = ifilter(f, tuples)
    return tuples

_filters = [f1, f2, f3, f4]

def time(f):
    def t():
        return [    (0,'tom','...',8),
                    (1,'john','...',17),
                    (2,'lisa','...',1),
                    (3,'eric','...',18)
                ]*1000
    for i in xrange(4):
        list(f(_filters[i:] * 15,t()))

if __name__=='__main__':
    for f in (original,filter_lambda_all,loop,pop_loop,with_ifilter,for_loop):
        t = Timer(lambda: time(f))
        d = t.timeit(number=400)
        print f.__name__, d

结果:

原始 7.23815271085
filter_lambda_all 14.1629812265
loop 7.23445844453
pop_loop 7.3084566637
with_ifilter 9.2767674205
for_loop 7.02854999945

于 2012-09-12T11:11:58.233 回答
6

我建议使用以下模式在生成器上自由地应用一系列/过滤器链:

from functools import reduce, partial
from itertools import ifilter

filtered = reduce(lambda s,f: ifilter(f,s), filter_set, unfiltered)

简而言之,它在生成器上从左到右设置了一个过滤器链,并返回将所有过滤器应用于原始生成器的生成器。

如果您正在寻找一份清单,以下内容就足够了:

[reduce(lambda s,f: ifilter(f,s), (f1,f2,f3,), tuples)]

如果您希望获得一个功能,您可以将其定义为:

chain_filters = partial(reduce, lambda s,f: ifilter(f,s))

并用作:

[chain_filters((f1,f2,f3,), tuples)]

请注意,此解决方案不组成过滤器(如all()),而是将它们链接起来。如果您正在使用一些繁重的计算,您可能希望将更激进的过滤器放在链的开头,例如在数据库查询过滤器之前放置一个布隆过滤器等。

于 2016-02-16T23:26:24.827 回答
4

类似于@Raymond Hettinger,

虽然,我建议使用 itertools 中的 ifilter 作为生成器。

from itertools import ifilter

def nFilter(filters,tuples):
      return ifilter(lambda t: all(f(t) for f in filters), tuples)
于 2012-09-12T10:46:50.910 回答
0

您可以定义@tokland 的替代方案

superFilter=lambda x:all(filter(x) for filter in filters)
newTuples=filter(superFilter,tuples)

或者一个班轮

 newTuples=filter(lambda x:all(filter(x) for filter in filters),tuples)

如果你只做这个超级过滤一次这是优越的。

于 2018-08-06T14:15:08.767 回答