我正在尝试构建一个函数,该函数可以用作我正在映射的 RxPy 流的处理程序。我拥有的函数需要访问定义该变量的范围之外的变量,对我来说,这意味着我需要使用某种闭包。所以我到达 functools.partial 以关闭一个变量并返回一个部分函数,我可以将其作为观察者传递给我的流。
但是,这样做会导致以下结果:
Traceback (most recent call last):
File "retry/example.py", line 46, in <module>
response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/linq/observable/selectmany.py", line 67, in select_many
selector = adapt_call(selector)
File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/internal/utils.py", line 37, in adapt_call_1
argnames, varargs, kwargs = getargspec(func)[:3]
File "/usr/lib/python2.7/inspect.py", line 816, in getargspec
raise TypeError('{!r} is not a Python function'.format(func))
TypeError: <method-wrapper '__call__' of functools.partial object at 0x2ce6cb0> is not a Python function
这是一些重现问题的示例代码:
from __future__ import absolute_import
from rx import Observable, Observer
from pykafka import KafkaClient
from pykafka.common import OffsetType
import logging
import requests
import functools
logger = logging.basicConfig()
def puts(thing):
print thing
def message_stream(consumer):
def thing(observer):
for message in consumer:
observer.on_next(message)
return Observable.create(thing)
def message_handler(message, context=None):
def req():
return requests.get('http://httpbin.org/get')
return Observable.start(req)
def handle_response(message, response, context=None):
consumer = context['consumer']
producer = context['producer']
t = 'even' if message % 2 == 0 else 'odd'
return str(message) + ': ' + str(response) + ' - ' + t + ' | ' + str(consumer) + ' | ' + producer
consumer = ['pretend', 'these', 'are', 'kafka', 'messages']
producer = 'some producer'
context = {
'consumer': consumer,
'producer': producer
}
message_stream = message_stream(consumer)
response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
message_response_stream = message_stream.zip(response_stream, functools.partial(handle_response, context=context))
message_stream.subscribe(puts)
问题似乎是我的部分函数False
在调用inspect.isfunction
.
如何使我的部分函数通过此检查?有没有办法轻松地将部分函数转换为“真实”函数类型?