我有一项使用 avahi 发布的服务,可以在不同的机器上运行。
我有一些代码,可以检测并获取该机器的 IP,然后开始使用该服务。
我不想自动化测试,所以我需要以某种方式伪造几台机器,以便在 avahi 解析它们时它们会有不同的 IP 地址。
这可能吗?
class ReceiversManager(object):
def __init__(self, name, on_message_callback):
self.logger = logging.getLogger()
self.logger.setLevel(logging.DEBUG)
self.name = name
self.on_message_callback = on_message_callback
self.service_detector = AvahiServiceDetector(self.name, self.on_new_system)
self.receivers = []
self.addresses = []
self.logger.setLevel(logging.DEBUG)
def on_new_system(self, name, address, port):
if not address in self.addresses:
self.addresses.append(address)
self.receivers.append(self.create_new_receiver(address, port))
def create_new_receiver(self, address, port):
self.logger.info('new address {}, new receiver created'.format(address))
requester = ZmqReq(host='tcp://' + address, port=5559)
id = self.create_id()
reply = requester.execute(ArchiveCreator.CREATE_QUEUES_REQUEST + ' ' + id)
recent_queue_name = ArchiveManager.create_queue_name(ArchiveManager.default_recent_queue_name, id)
return DirectRabbitReceiver(host=address, queue_name=recent_queue_name,
start_immediately=True, on_receive_callback=self.on_message_callback)
@staticmethod
def create_id():
return uuid.uuid5(uuid.NAMESPACE_DNS, ReceiversManager.get_baseboard_id() + ReceiversManager.get_disk_id()).hex
@staticmethod
def get_baseboard_id():
info = dmidecode.QuerySection('baseboard')
return info[info.keys()[1]]['data']['Serial Number']
@staticmethod
def get_disk_id():
result = subprocess.check_output(['hdparm', '-I', '/dev/sda']).split()
return result[result.index('Serial') + 2]
class AvahiServicePublisher():
def __init__(self):
self.group = None
bus = dbus.SystemBus()
server = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
avahi.DBUS_PATH_SERVER),
avahi.DBUS_INTERFACE_SERVER)
self.group = dbus.Interface(
bus.get_object(avahi.DBUS_NAME,
server.EntryGroupNew()),
avahi.DBUS_INTERFACE_ENTRY_GROUP)
def publish(self, name, port, stype="_http._tcp", domain="", host="", text=""):
self.group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
name, stype, domain, host,
dbus.UInt16(port), text)
self.group.Commit()
def unpublish(self):
self.group.Reset()