5

我正在尝试构建一个函数,该函数可以用作我正在映射的 RxPy 流的处理程序。我拥有的函数需要访问定义该变量的范围之外的变量,对我来说,这意味着我需要使用某种闭包。所以我到达 functools.partial 以关闭一个变量并返回一个部分函数,​​我可以将其作为观察者传递给我的流。

但是,这样做会导致以下结果:

Traceback (most recent call last):
  File "retry/example.py", line 46, in <module>
    response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
  File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/linq/observable/selectmany.py", line 67, in select_many
    selector = adapt_call(selector)
  File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/internal/utils.py", line 37, in adapt_call_1
    argnames, varargs, kwargs = getargspec(func)[:3]
  File "/usr/lib/python2.7/inspect.py", line 816, in getargspec
    raise TypeError('{!r} is not a Python function'.format(func))
TypeError: <method-wrapper '__call__' of functools.partial object at 0x2ce6cb0> is not a Python function

这是一些重现问题的示例代码:

from __future__ import absolute_import
from rx import Observable, Observer
from pykafka import KafkaClient
from pykafka.common import OffsetType
import logging
import requests
import functools


logger = logging.basicConfig()


def puts(thing):
    print thing


def message_stream(consumer):
    def thing(observer):
        for message in consumer:
            observer.on_next(message)

    return Observable.create(thing)


def message_handler(message, context=None):
    def req():
        return requests.get('http://httpbin.org/get')

    return Observable.start(req)


def handle_response(message, response, context=None):
    consumer = context['consumer']
    producer = context['producer']
    t = 'even' if message % 2 == 0 else 'odd'
    return str(message) + ': ' + str(response) + ' - ' + t + ' | ' + str(consumer) + ' | ' + producer


consumer = ['pretend', 'these', 'are', 'kafka', 'messages']
producer = 'some producer'
context = {
    'consumer': consumer,
    'producer': producer
}
message_stream = message_stream(consumer)
response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
message_response_stream = message_stream.zip(response_stream, functools.partial(handle_response, context=context))
message_stream.subscribe(puts)

问题似乎是我的部分函数False在调用inspect.isfunction.

如何使我的部分函数通过此检查?有没有办法轻松地将部分函数转换为“真实”函数类型?

4

1 回答 1

6

你问它是否真的是一个函数,它告诉你不是一个函数。它是一个方法包装器。

你想鸭式

>>> def printargs(*args):
...     print args

>>> import inspect
>>> from functools import partial
>>> inspect.isfunction(printargs)
True
>>> f = partial(printargs, 1)
>>> inspect.isfunction(f)
False
# try duck-typing, see if the variable is callable
# check does it work for a method-wrapper?
>>> callable(f)
True
# check an integer, which should be false
>>> callable(1)
False
# ensure it works on an actual function
>>> callable(printargs)
True

这就是你鸭式的原因。你不在乎它是否是一个函数。你关心它是否像一个函数一样起作用。

编辑:如果足够绝望,您可以编写一个类并传递对类中函数的引用。

class A():
    def __init__(self, frozen, *args, **kwds):
        self.frozen = frozen
        self.args = args
        self.kwds = kwds

    def call(self):
        self.frozen(*self.args, **self.kwds)

然后只需使用 A(f).call 作为您的包装器。

>>> f_ = A(f)
>>> inspect.ismethod(f_.call)
True
>>> f_.call()
(1,)

只要 ismethod 有效,它就有效。

如果没有,你真的需要一个装饰器。

最终编辑:如果您真的很绝望并且不想编写自定义装饰器,您可以使用带有元组的 lambda 函数来传递来创建一个类似部分的函数。

前任。:

>>> import inspect
>>> def printargs(*args):
...     print args
>>> a = (1,2,3)
>>> f = lambda x: printargs(*x)
>>> f(a)
(1, 2, 3)
>>> inspect.isfunction(f)
True
于 2015-06-17T22:01:49.183 回答