0

我有一项使用 avahi 发布的服务,可以在不同的机器上运行。

我有一些代码,可以检测并获取该机器的 IP,然后开始使用该服务。

我不想自动化测试,所以我需要以某种方式伪造几台机器,以便在 avahi 解析它们时它们会有不同的 IP 地址。

这可能吗?

class ReceiversManager(object):

    def __init__(self, name, on_message_callback):
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.DEBUG)
        self.name = name
        self.on_message_callback = on_message_callback
        self.service_detector = AvahiServiceDetector(self.name, self.on_new_system)
        self.receivers = []
        self.addresses = []
        self.logger.setLevel(logging.DEBUG)

    def on_new_system(self, name, address, port):
        if not address in self.addresses:
            self.addresses.append(address)
            self.receivers.append(self.create_new_receiver(address, port))

    def create_new_receiver(self, address, port):
        self.logger.info('new address {}, new receiver created'.format(address))
        requester = ZmqReq(host='tcp://' + address, port=5559)
        id = self.create_id()
        reply = requester.execute(ArchiveCreator.CREATE_QUEUES_REQUEST + ' ' + id)
        recent_queue_name = ArchiveManager.create_queue_name(ArchiveManager.default_recent_queue_name, id)
        return DirectRabbitReceiver(host=address, queue_name=recent_queue_name,
                             start_immediately=True, on_receive_callback=self.on_message_callback)

    @staticmethod
    def create_id():
        return uuid.uuid5(uuid.NAMESPACE_DNS, ReceiversManager.get_baseboard_id() + ReceiversManager.get_disk_id()).hex

    @staticmethod
    def get_baseboard_id():
        info = dmidecode.QuerySection('baseboard')
        return info[info.keys()[1]]['data']['Serial Number']

    @staticmethod
    def get_disk_id():
        result = subprocess.check_output(['hdparm', '-I', '/dev/sda']).split()
        return result[result.index('Serial') + 2]

class AvahiServicePublisher():

    def __init__(self):
        self.group = None
        bus = dbus.SystemBus()
        server = dbus.Interface(
            bus.get_object(
                avahi.DBUS_NAME,
                avahi.DBUS_PATH_SERVER),
            avahi.DBUS_INTERFACE_SERVER)

        self.group = dbus.Interface(
            bus.get_object(avahi.DBUS_NAME,
                           server.EntryGroupNew()),
            avahi.DBUS_INTERFACE_ENTRY_GROUP)

    def publish(self, name, port, stype="_http._tcp", domain="", host="", text=""):
        self.group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
                              name, stype, domain, host,
                              dbus.UInt16(port), text)

        self.group.Commit()

    def unpublish(self):
        self.group.Reset()
4

0 回答 0