0

我是一个php人,我不太了解python......所以..如果我的问题很愚蠢,请原谅。我有 2 个使用 Gearman 的 php 脚本(client.php 和 worker.php),我需要将其转换为 python 版本。我已经能够部分做到这一点,但我被卡住了。首先,这是我的两个脚本:

客户端.py

#!/usr/bin/env python
import gearman
import json

RBLS = [
    'b.barracudacentral.org',
    'bl.emailbasura.org'
]

IP = '1.2.3.4'

def check_request_status(job_request):
    if job_request.complete:
        print "Job %s finished!  Result: %s - %s" % (job_request.job.unique, job_request.state, job_request.result)
    elif job_request.timed_out:
        print "Job %s timed out!" % job_request.unique
    elif job_request.state == JOB_UNKNOWN:
        print "Job %s connection failed!" % job_request.unique


data = {"ip": IP, "rbls": RBLS}
serialized_data = json.dumps(data)

gm_client = gearman.GearmanClient(['localhost:4730'])
completed_job_request = gm_client.submit_job("runcheck", serialized_data)
check_request_status(completed_job_request)

工人.py

#!/usr/bin/env python

import gearman
import sys
import socket
import re
import json
from dns.resolver import Resolver, NXDOMAIN, NoNameservers, Timeout, NoAnswer
from threading import Thread

# This hardcoded RBLS need to be passed by gearman client script
# RBLS = ['xbl.spamhaus.org', 'zen.spamhaus.org']

class Lookup(Thread):
    def __init__(self, host, dnslist, listed, resolver):
        Thread.__init__(self)
        self.host = host
        self.listed = listed
        self.dnslist = dnslist
        self.resolver = resolver

    def run(self):
        try:
            host_record = self.resolver.query(self.host, "A")
            if len(host_record) > 0:
                self.listed[self.dnslist]['LISTED'] = True
                self.listed[self.dnslist]['HOST'] = host_record[0].address
                text_record = self.resolver.query(self.host, "TXT")
                if len(text_record) > 0:
                    self.listed[self.dnslist]['TEXT'] = "\n".join(text_record[0].strings)
            self.listed[self.dnslist]['ERROR'] = False
        except NXDOMAIN:
            self.listed[self.dnslist]['ERROR'] = True
            self.listed[self.dnslist]['ERRORTYPE'] = NXDOMAIN
        except NoNameservers:
            self.listed[self.dnslist]['ERROR'] = True
            self.listed[self.dnslist]['ERRORTYPE'] = NoNameservers
        except Timeout:
            self.listed[self.dnslist]['ERROR'] = True
            self.listed[self.dnslist]['ERRORTYPE'] = Timeout
        except NameError:
            self.listed[self.dnslist]['ERROR'] = True
            self.listed[self.dnslist]['ERRORTYPE'] = NameError
        except NoAnswer:
            self.listed[self.dnslist]['ERROR'] = True
            self.listed[self.dnslist]['ERRORTYPE'] = NoAnswer

class RBLSearch(object):
    def __init__(self, lookup_host):
        self.lookup_host = lookup_host
        self._listed = None
        self.resolver = Resolver()
        self.resolver.timeout = 0.2
        self.resolver.lifetime = 1.0

    def search(self):
        if self._listed is not None:
            pass
        else:
            host = self.lookup_host.split(".")
            host = ".".join(list(reversed(host)))
            self._listed = {'SEARCH_HOST': self.lookup_host}
            threads = []
            for LIST in RBLS:
                self._listed[LIST] = {'LISTED': False}
                query = Lookup("%s.%s" % (host, LIST), LIST, self._listed, self.resolver)
                threads.append(query)
                query.start()
            for thread in threads:
                thread.join()
        return self._listed
    listed = property(search)

    def print_results(self):
        listed = self.listed
        print("")
        print("--- DNSBL Report for %s ---" % listed['SEARCH_HOST'])
        for key in listed:
            if key == 'SEARCH_HOST':
                continue
            if not listed[key].get('ERROR'):
                if listed[key]['LISTED']:
                    print("Results for %s: %s" % (key, listed[key]['LISTED']))
                    print("  + Host information: %s" % \
                          (listed[key]['HOST']))
                if 'TEXT' in listed[key].keys():
                    print("    + Additional information: %s" % \
                          (listed[key]['TEXT']))
            else:
                #print "*** Error contacting %s ***" % key
                pass


def task_listener_runcheck(gearman_worker, gearman_job):
    jdata = json.loads(gearman_job.data)
    host = jdata['ip']
    ip = host
    RBLS = jdata['rbls']

    print("Looking up: %s (please wait)" % host)
    pat = re.compile("\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}")
    is_ip_address = pat.match(host)
    if not is_ip_address:
        try:
            ip = socket.gethostbyname(host)
            print("Hostname %s resolved to ip %s" % (host,ip))
        except socket.error:
            print("Hostname %s can't be resolved" % host)
            ip = ""
    if ip:
        searcher = RBLSearch(ip)
        searcher.print_results()
    return "RunCheck was successfull"


gm_worker = gearman.GearmanWorker(['localhost:4730'])

gm_worker.set_client_id('python-worker')
gm_worker.register_task('runcheck', task_listener_runcheck)
gm_worker.work()

以下是这 2 个脚本的工作方式:client.py 将 IP 地址和 rbl 数组传递给 worker.py。然后工作人员获取 IP 地址并与所有 rbl 进行核对。

问题是我不知道如何在 RBLSearch 类中使用 RBLS。如果我在脚本开头对 RBLS 进行硬编码,它就可以工作(参见 worker.py,第 12 行),但如果我在 task_listener_runcheck 中定义 RBLS,它就不起作用

4

1 回答 1

1

我已经能够解决它。这是编辑后的版本(如果有人想要的话):

工人.py

#!/usr/bin/env python

import gearman
import sys
import socket
import re
import json
from dns.resolver import Resolver, NXDOMAIN, NoNameservers, Timeout, NoAnswer
from threading import Thread

class Lookup(Thread):
    def __init__(self, host, dnslist, listed, resolver):
        Thread.__init__(self)
        self.host = host
        self.listed = listed
        self.dnslist = dnslist
        self.resolver = resolver

    def run(self):
        try:
            host_record = self.resolver.query(self.host, "A")
            if len(host_record) > 0:
                self.listed[self.dnslist]['LISTED'] = True
                self.listed[self.dnslist]['HOST'] = host_record[0].address
                text_record = self.resolver.query(self.host, "TXT")
                if len(text_record) > 0:
                    self.listed[self.dnslist]['TEXT'] = "\n".join(text_record[0].strings)
            self.listed[self.dnslist]['ERROR'] = False
        except NXDOMAIN:
            self.listed[self.dnslist]['ERROR'] = False
            self.listed[self.dnslist]['ERRORTYPE'] = NXDOMAIN
        except NoNameservers:
            self.listed[self.dnslist]['ERROR'] = True
            self.listed[self.dnslist]['ERRORTYPE'] = NoNameservers
            self.listed[self.dnslist]['TEXT'] = "%s - The operation timed out." % self.host
        except Timeout:
            self.listed[self.dnslist]['ERROR'] = True
            self.listed[self.dnslist]['ERRORTYPE'] = Timeout
            self.listed[self.dnslist]['TEXT'] = "%s - The operation timed out." % self.host
        except NameError:
            self.listed[self.dnslist]['ERROR'] = True
            self.listed[self.dnslist]['ERRORTYPE'] = NameError
            self.listed[self.dnslist]['TEXT'] = "%s - NameError" % self.host
        except NoAnswer:
            self.listed[self.dnslist]['ERROR'] = True
            self.listed[self.dnslist]['ERRORTYPE'] = NoAnswer
            self.listed[self.dnslist]['TEXT'] = "%s - The response did not contain an answer to the question." % self.host

class RBLSearch(object):
    def __init__(self, lookup_host, rbls):
        self.lookup_host = lookup_host
        self.rbls = rbls
        self._listed = None
        self.resolver = Resolver()
        self.resolver.timeout = 0.2
        self.resolver.lifetime = 1.0

    def search(self):
        if self._listed is not None:
            pass
        else:
            host = self.lookup_host.split(".")
            host = ".".join(list(reversed(host)))
            self._listed = {'SEARCH_HOST': self.lookup_host}
            threads = []
            for LIST in self.rbls:
                self._listed[LIST] = {'LISTED': False}
                query = Lookup("%s.%s" % (host, LIST), LIST, self._listed, self.resolver)
                threads.append(query)
                query.start()
            for thread in threads:
                thread.join()
        return self._listed
    listed = property(search)

    def print_results(self):
        listed = self.listed
        print("")
        print("--- DNSBL Report for %s ---" % listed['SEARCH_HOST'])
        for key in listed:
            if key == 'SEARCH_HOST':
                continue
            if not listed[key].get('ERROR'):
                if listed[key]['LISTED']:
                    print("Results for %s: %s" % (key, listed[key]['LISTED']))
                    print("  + Host information: %s" % \
                          (listed[key]['HOST']))
                if 'TEXT' in listed[key].keys():
                    print("    + Additional information: %s" % \
                          (listed[key]['TEXT']))
                else:
                    print("Not listed in %s" % (key))
            else:
                #print "*** Error contacting %s ***" % key
                pass

def task_listener_runcheck(gearman_worker, gearman_job):
    jdata = json.loads(gearman_job.data)
    host = jdata['ip']
    rbls = jdata['rbls']
    ip = host

    print("Looking up: %s (please wait)" % host)
    pat = re.compile("\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}")
    is_ip_address = pat.match(host)
    if not is_ip_address:
        try:
            ip = socket.gethostbyname(host)
            print("Hostname %s resolved to ip %s" % (host,ip))
        except socket.error:
            print("Hostname %s can't be resolved" % host)
            ip = ""
    if ip:
        searcher = RBLSearch(ip, rbls)
        searcher.print_results()
    return "RunCheck was successfull"


gm_worker = gearman.GearmanWorker(['localhost:4730'])

gm_worker.set_client_id('python-worker')
gm_worker.register_task('runcheck', task_listener_runcheck)
gm_worker.work()
于 2014-06-29T14:40:30.817 回答