21

我正在使用 PyYaml 从我自己的 python 对象创建 Yaml 文档。例如我的对象:

class MyObj(object):
    name = "boby"
    age = 34

变成:

boby:
   age: 34

到现在为止还挺好。

但我还没有找到一种以编程方式向生成的 yaml 添加注释的方法,所以它看起来像:

boby:       # this is the name
   age: 34  # in years

查看 PyYaml 文档和代码,我发现没有办法这样做。

有什么建议么?

4

2 回答 2

5

您可能有一些 MyObj 类的代表,因为默认情况下print(yaml.dump(MyObj()))使用 PyYAML 转储 () 将为您提供:

!!python/object:__main__.MyObj {}

PyYAML 只能对所需输出中的注释做一件事:丢弃它们。如果您要重新读取所需的输出,您最终会得到一个包含 dict ( 的 dict ,因为没有标签信息{'boby': {'age': 34}},您将无法获得实例)MyObj()

我开发的 PyYAML 增强版 ( ruamel.yaml ) 可以读取带有注释的 YAML,保留注释并在转储时写入注释。如果你阅读你想要的输出,结果数据看起来(和行为)就像一个包含字典的字典,但实际上有更复杂的数据结构可以处理评论。但是,当 ruamel.yaml 要求您转储实例时,您可以创建该结构,MyObj并且如果您当时添加注释,您将获得所需的输出。

from __future__ import print_function

import sys
import ruamel.yaml
from ruamel.yaml.comments import CommentedMap


class MyObj():
    name = "boby"
    age = 34

    def convert_to_yaml_struct(self):
        x = CommentedMap()
        a = CommentedMap()
        x[data.name] = a
        x.yaml_add_eol_comment('this is the name', 'boby', 11)
        a['age'] = data.age
        a.yaml_add_eol_comment('in years', 'age', 11)
        return x

    @staticmethod
    def yaml_representer(dumper, data, flow_style=False):
        assert isinstance(dumper, ruamel.yaml.RoundTripDumper)
        return dumper.represent_dict(data.convert_to_yaml_struct())


ruamel.yaml.RoundTripDumper.add_representer(MyObj, MyObj.yaml_representer)

ruamel.yaml.round_trip_dump(MyObj(), sys.stdout)

哪个打印:

boby:      # this is the name
  age: 34  # in years

无需等待创建CommentedMap实例,直到您想要表示MyObj实例。例如,我将 makenameageinto 属性,从/上获取/设置值 approprate CommentedMap。这样,您可以在yaml_representer调用静态方法来表示MyObj实例之前更轻松地添加注释。

于 2015-04-15T12:14:09.797 回答
3

这是我想出的解决方案;它有点复杂,但不如 ruamel 复杂,因为它完全与普通的 PyYAML API 一起工作,并且没有往返注释(因此它不是这个其他问题的适当答案)。它可能总体上还没有那么健壮,因为我没有进行广泛的测试,但对于我的用例来说似乎已经足够了,也就是说,我希望 dicts/mappings 能够对整个映射以及每项评论。

我相信往返评论——在这种有限的背景下——也可以通过类似的方法实现,但我没有尝试过,因为它目前不是我拥有的用例。

最后,虽然这个解决方案没有实现向列表/序列中的项目添加每个项目的评论(因为这不是我目前需要的东西),但它可以很容易地扩展到这样做。

首先,就像在 ruamel 中一样,我们需要一种CommentedMapping类,它将评论与Mapping中的每个键相关联。有很多可能的方法。我的只是一个:

from collections.abc import Mapping, MutableMapping

class CommentedMapping(MutableMapping):
    def __init__(self, d, comment=None, comments={}):
        self.mapping = d
        self.comment = comment
        self.comments = comments

    def get_comment(self, *path):
        if not path:
            return self.comment

        # Look the key up in self (recursively) and raise a
        # KeyError or other execption if such a key does not
        # exist in the nested structure
        sub = self.mapping
        for p in path:
            if isinstance(sub, CommentedMapping):
                # Subvert comment copying
                sub = sub.mapping[p]
            else:
                sub = sub[p]

        comment = None
        if len(path) == 1:
            comment = self.comments.get(path[0])
        if comment is None:
            comment = self.comments.get(path)
        return comment

    def __getitem__(self, item):
        val = self.mapping[item]
        if (isinstance(val, (dict, Mapping)) and
                not isinstance(val, CommentedMapping)):
            comment = self.get_comment(item)
            comments = {k[1:]: v for k, v in self.comments.items()
                        if isinstance(k, tuple) and len(k) > 1 and k[0] == item}
            val = self.__class__(val, comment=comment, comments=comments)
        return val

    def __setitem__(self, item, value):
        self.mapping[item] = value

    def __delitem__(self, item):
        del self.mapping[item]
        for k in list(self.comments):
            if k == item or (isinstance(k, tuple) and k and k[0] == item):
                del self.comments[key]

    def __iter__(self):
        return iter(self.mapping)

    def __len__(self):
        return len(self.mapping)

    def __repr__(self):
        return f'{type(self).__name__}({self.mapping}, comment={self.comment!r}, comments={self.comments})'

这个类既有一个.comment属性,所以它可以携带映射的整体注释,以及一个.comments包含每个键注释的属性。它还允许通过将键路径指定为元组来为嵌套字典中的键添加注释。例如,允许为嵌套字典中comments={('c', 'd'): 'comment'}的键指定注释。从 获取项目时,如果项目的值是 dict/Mapping,它也会以保留其注释的方式包装在 a 中。这对于递归调用嵌套结构的 YAML 表示器很有用。'd''c'CommentedMappingCommentedMapping

接下来我们需要实现一个自定义的 YAML Dumper,它负责将对象序列化为 YAML 的整个过程。Dumper 是一个复杂的类,它由其他四个类 an Emitter、 a Serializer、 aRepresenter和 a组成Resolver。其中我们只需要实现前三个;Resolvers 更关心的是,例如隐含的标量如何1解析为正确的类型,以及确定各种值的默认标签。这里并没有真正涉及。

首先我们实现一个解析器。解析器负责识别不同的 Python 类型,并将它们映射到本机 YAML 数据结构/表示图中的相应节点。即,这些包括标量节点、序列节点和映射节点。例如,基Representer类包含一个 Python dicts 的表示器,它将它们转换为 a MappingNode(dict 中的每个项目又由一对ScalarNodes 组成,一个用于每个键,一个用于每个值)。

为了将注释附加到整个映射以及映射中的每个键,我们引入了两种新Node类型,它们不是 YAML 规范的正式组成部分:

from yaml.node import Node, ScalarNode, MappingNode

class CommentedNode(Node):
    """Dummy base class for all nodes with attached comments."""


class CommentedScalarNode(ScalarNode, CommentedNode):
    def __init__(self, tag, value, start_mark=None, end_mark=None, style=None,
                 comment=None):
        super().__init__(tag, value, start_mark, end_mark, style)
        self.comment = comment


class CommentedMappingNode(MappingNode, CommentedNode):
    def __init__(self, tag, value, start_mark=None, end_mark=None,
                 flow_style=None, comment=None, comments={}):
        super().__init__(tag, value, start_mark, end_mark, flow_style)
        self.comment = comment
        self.comments = comments

然后我们添加 a CommentedRepresenter,其中包含将 a 表示CommentedMapping为 a 的代码CommentedMappingNode。实际上,它只是重用基类的代码来表示映射,但将返回的转换MappingNodeCommentedMappingNode. 它还将每个键从 a 转换ScalarNode为 a CommentedscalarNode。我们基于SafeRepresenter这里,因为我不需要序列化任意 Python 对象:

from yaml.representer import SafeRepresenter

class CommentedRepresenter(SafeRepresenter):
    def represent_commented_mapping(self, data):
        node = super().represent_dict(data)
        comments = {k: data.get_comment(k) for k in data}
        value = []
        for k, v in node.value:
            if k.value in comments:
                k = CommentedScalarNode(
                        k.tag, k.value,
                        k.start_mark, k.end_mark, k.style,
                        comment=comments[k.value])
            value.append((k, v))

        node = CommentedMappingNode(
            node.tag,
            value,
            flow_style=False,  # commented dicts must be in block style
                               # this could be implemented differently for flow-style
                               # maps, but for my case I only want block-style, and
                               # it makes things much simpler
            comment=data.get_comment(),
            comments=comments
        )
        return node

    yaml_representers = SafeRepresenter.yaml_representers.copy()
    yaml_representers[CommentedMapping] = represent_commented_mapping

接下来我们需要实现一个子类Serializer序列化器负责遍历节点的表示图,并为每个节点输出一个或多个事件到发射,发射器是一个复杂(有时难以遵循)的状态机,它接收事件流并输出适当的 YAML每个事件的标记(例如,当接收到时,如果它是流式映射,MappingStartEvent将输出 a ,和/或为后续输出添加适当的缩进级别,直到对应的.{MappingEndEvent

重点是,新的序列化程序必须输出表示评论的事件,以便发射器知道何时需要发出评论。这只是通过添加 aCommentEvent并在每次在表示中遇到a CommentedMappingNodeor时发出它们来处理:CommentedScalarNode

from yaml import Event

class CommentEvent(yaml.Event):
    """
    Simple stream event representing a comment to be output to the stream.
    """
    def __init__(self, value, start_mark=None, end_mark=None):
        super().__init__(start_mark, end_mark)
        self.value = value

class CommentedSerializer(Serializer):
    def serialize_node(self, node, parent, index):
        if (node not in self.serialized_nodes and
                isinstance(node, CommentedNode) and
                not (isinstance(node, CommentedMappingNode) and
                     isinstance(parent, CommentedMappingNode))):
            # Emit CommentEvents, but only if the current node is not a
            # CommentedMappingNode nested in another CommentedMappingNode (in
            # which case we would have already emitted its comment via the
            # parent mapping)
            self.emit(CommentEvent(node.comment))

        super().serialize_node(node, parent, index)

接下来,Emitter需要对CommentEvents 进行子类化处理。这可能是最棘手的部分,因为正如我所写,发射器有点复杂和脆弱,并且以难以修改状态机的方式编写(我很想更清楚地重写它,但没有时间现在)。所以我尝试了许多不同的解决方案。

这里的关键方法是Emitter.emit处理事件流,并调用“状态”方法,这些方法根据机器所处的状态执行一些操作,而这些状态又受到流中出现的事件的影响。一个重要的认识是,在许多情况下,流处理在等待更多事件进入时被挂起——这就是该Emitter.need_more_events方法的职责。在某些情况下,在处理当前事件之前,需要先进入更多事件。例如,在MappingStartEvent至少需要在流上缓冲 3 个事件:第一个键/值对,以及可能的下一个键。发射器在开始格式化地图之前需要知道地图中是否有一个或多个项目,并且可能还需要知道第一个键/值对的长度。在处理当前事件之前所需的事件数在need_more_events方法中是硬编码的。

问题是这并没有考虑CommentEvent到事件流上现在可能存在的 s,这不应该影响其他事件的处理。因此Emitter.need_events,解释 s 存在的方法CommentEvent。例如,如果当前事件是MappingStartEvent,并且缓冲了 3 个后续事件,如果其中一个是CommentEvent我们无法计算的,那么我们至少需要 4 个事件(以防下一个是预期事件之一在映射中)。

最后,每次CommentEvent在流中遇到 a 时,我们都会强行跳出当前的事件处理循环来处理注释的写入,然后CommentEvent从流中弹出并继续,就好像什么都没发生一样。这是最终结果:

import textwrap
from yaml.emitter import Emitter

class CommentedEmitter(Emitter):
    def need_more_events(self):
        if self.events and isinstance(self.events[0], CommentEvent):
            # If the next event is a comment, always break out of the event
            # handling loop so that we divert it for comment handling
            return True
        return super().need_more_events()

    def need_events(self, count):
        # Hack-y: the minimal number of queued events needed to start
        # a block-level event is hard-coded, and does not account for
        # possible comment events, so here we increase the necessary
        # count for every comment event
        comments = [e for e in self.events if isinstance(e, CommentEvent)]
        return super().need_events(count + min(count, len(comments)))

    def emit(self, event):
        if self.events and isinstance(self.events[0], CommentEvent):
            # Write the comment, then pop it off the event stream and continue
            # as normal
            self.write_comment(self.events[0].value)
            self.events.pop(0)

        super().emit(event)

    def write_comment(self, comment):
        indent = self.indent or 0
        width = self.best_width - indent - 2  # 2 for the comment prefix '# '
        lines = ['# ' + line for line in wrap(comment, width)]

        for line in lines:
            if self.encoding:
                line = line.encode(self.encoding)
            self.write_indent()
            self.stream.write(line)
            self.write_line_break()

我还尝试了不同的方法来实现write_comment. 基Emitter类有它自己的方法 ( write_plain),它可以通过适当的缩进和换行处理将文本写入流。但是,它不够灵活,无法处理诸如注释之类的内容,其中每一行都需要以'# '. 我尝试的一种技术是猴子修补write_indent处理这种情况的方法,但最后它太丑陋了。我发现简单地使用 Python 的内置textwrap.wrap就足够了。

接下来,我们通过子类化现有SafeDumper但将我们的新类插入 MRO 来创建转储程序:

from yaml import SafeDumper

class CommentedDumper(CommentedEmitter, CommentedSerializer,
                      CommentedRepresenter, SafeDumper):
    """
    Extension of `yaml.SafeDumper` that supports writing `CommentedMapping`s with
    all comments output as YAML comments.
    """

这是一个示例用法:

>>> import yaml
>>> d = CommentedMapping({
...     'a': 1,
...     'b': 2,
...     'c': {'d': 3},
... }, comment='my commented dict', comments={
...     'a': 'a comment',
...     'b': 'b comment',
...     'c': 'long string ' * 44,
...     ('c', 'd'): 'd comment'
... })
>>> print(yaml.dump(d, Dumper=CommentedDumper))
# my commented dict
# a comment
a: 1
# b comment
b: 2
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string
c:
  # d comment
  d: 3

我还没有非常广泛地测试这个解决方案,它可能仍然包含错误。当我更多地使用它并找到角落案例等时,我会更新它。

于 2020-03-03T13:21:13.673 回答