这是我想出的解决方案;它有点复杂,但不如 ruamel 复杂,因为它完全与普通的 PyYAML API 一起工作,并且没有往返注释(因此它不是这个其他问题的适当答案)。它可能总体上还没有那么健壮,因为我没有进行广泛的测试,但对于我的用例来说似乎已经足够了,也就是说,我希望 dicts/mappings 能够对整个映射以及每项评论。
我相信往返评论——在这种有限的背景下——也可以通过类似的方法实现,但我没有尝试过,因为它目前不是我拥有的用例。
最后,虽然这个解决方案没有实现向列表/序列中的项目添加每个项目的评论(因为这不是我目前需要的东西),但它可以很容易地扩展到这样做。
首先,就像在 ruamel 中一样,我们需要一种CommentedMapping
类,它将评论与Mapping中的每个键相关联。有很多可能的方法。我的只是一个:
from collections.abc import Mapping, MutableMapping
class CommentedMapping(MutableMapping):
def __init__(self, d, comment=None, comments={}):
self.mapping = d
self.comment = comment
self.comments = comments
def get_comment(self, *path):
if not path:
return self.comment
# Look the key up in self (recursively) and raise a
# KeyError or other execption if such a key does not
# exist in the nested structure
sub = self.mapping
for p in path:
if isinstance(sub, CommentedMapping):
# Subvert comment copying
sub = sub.mapping[p]
else:
sub = sub[p]
comment = None
if len(path) == 1:
comment = self.comments.get(path[0])
if comment is None:
comment = self.comments.get(path)
return comment
def __getitem__(self, item):
val = self.mapping[item]
if (isinstance(val, (dict, Mapping)) and
not isinstance(val, CommentedMapping)):
comment = self.get_comment(item)
comments = {k[1:]: v for k, v in self.comments.items()
if isinstance(k, tuple) and len(k) > 1 and k[0] == item}
val = self.__class__(val, comment=comment, comments=comments)
return val
def __setitem__(self, item, value):
self.mapping[item] = value
def __delitem__(self, item):
del self.mapping[item]
for k in list(self.comments):
if k == item or (isinstance(k, tuple) and k and k[0] == item):
del self.comments[key]
def __iter__(self):
return iter(self.mapping)
def __len__(self):
return len(self.mapping)
def __repr__(self):
return f'{type(self).__name__}({self.mapping}, comment={self.comment!r}, comments={self.comments})'
这个类既有一个.comment
属性,所以它可以携带映射的整体注释,以及一个.comments
包含每个键注释的属性。它还允许通过将键路径指定为元组来为嵌套字典中的键添加注释。例如,允许为嵌套字典中comments={('c', 'd'): 'comment'}
的键指定注释。从 获取项目时,如果项目的值是 dict/Mapping,它也会以保留其注释的方式包装在 a 中。这对于递归调用嵌套结构的 YAML 表示器很有用。'd'
'c'
CommentedMapping
CommentedMapping
接下来我们需要实现一个自定义的 YAML Dumper,它负责将对象序列化为 YAML 的整个过程。Dumper 是一个复杂的类,它由其他四个类 an Emitter
、 a Serializer
、 aRepresenter
和 a组成Resolver
。其中我们只需要实现前三个;Resolver
s 更关心的是,例如隐含的标量如何1
解析为正确的类型,以及确定各种值的默认标签。这里并没有真正涉及。
首先我们实现一个解析器。解析器负责识别不同的 Python 类型,并将它们映射到本机 YAML 数据结构/表示图中的相应节点。即,这些包括标量节点、序列节点和映射节点。例如,基Representer
类包含一个 Python dict
s 的表示器,它将它们转换为 a MappingNode
(dict 中的每个项目又由一对ScalarNode
s 组成,一个用于每个键,一个用于每个值)。
为了将注释附加到整个映射以及映射中的每个键,我们引入了两种新Node
类型,它们不是 YAML 规范的正式组成部分:
from yaml.node import Node, ScalarNode, MappingNode
class CommentedNode(Node):
"""Dummy base class for all nodes with attached comments."""
class CommentedScalarNode(ScalarNode, CommentedNode):
def __init__(self, tag, value, start_mark=None, end_mark=None, style=None,
comment=None):
super().__init__(tag, value, start_mark, end_mark, style)
self.comment = comment
class CommentedMappingNode(MappingNode, CommentedNode):
def __init__(self, tag, value, start_mark=None, end_mark=None,
flow_style=None, comment=None, comments={}):
super().__init__(tag, value, start_mark, end_mark, flow_style)
self.comment = comment
self.comments = comments
然后我们添加 a CommentedRepresenter
,其中包含将 a 表示CommentedMapping
为 a 的代码CommentedMappingNode
。实际上,它只是重用基类的代码来表示映射,但将返回的转换MappingNode
为CommentedMappingNode
. 它还将每个键从 a 转换ScalarNode
为 a CommentedscalarNode
。我们基于SafeRepresenter
这里,因为我不需要序列化任意 Python 对象:
from yaml.representer import SafeRepresenter
class CommentedRepresenter(SafeRepresenter):
def represent_commented_mapping(self, data):
node = super().represent_dict(data)
comments = {k: data.get_comment(k) for k in data}
value = []
for k, v in node.value:
if k.value in comments:
k = CommentedScalarNode(
k.tag, k.value,
k.start_mark, k.end_mark, k.style,
comment=comments[k.value])
value.append((k, v))
node = CommentedMappingNode(
node.tag,
value,
flow_style=False, # commented dicts must be in block style
# this could be implemented differently for flow-style
# maps, but for my case I only want block-style, and
# it makes things much simpler
comment=data.get_comment(),
comments=comments
)
return node
yaml_representers = SafeRepresenter.yaml_representers.copy()
yaml_representers[CommentedMapping] = represent_commented_mapping
接下来我们需要实现一个子类Serializer
。序列化器负责遍历节点的表示图,并为每个节点输出一个或多个事件到发射器,发射器是一个复杂(有时难以遵循)的状态机,它接收事件流并输出适当的 YAML每个事件的标记(例如,当接收到时,如果它是流式映射,MappingStartEvent
将输出 a ,和/或为后续输出添加适当的缩进级别,直到对应的.{
MappingEndEvent
重点是,新的序列化程序必须输出表示评论的事件,以便发射器知道何时需要发出评论。这只是通过添加 aCommentEvent
并在每次在表示中遇到a CommentedMappingNode
or时发出它们来处理:CommentedScalarNode
from yaml import Event
class CommentEvent(yaml.Event):
"""
Simple stream event representing a comment to be output to the stream.
"""
def __init__(self, value, start_mark=None, end_mark=None):
super().__init__(start_mark, end_mark)
self.value = value
class CommentedSerializer(Serializer):
def serialize_node(self, node, parent, index):
if (node not in self.serialized_nodes and
isinstance(node, CommentedNode) and
not (isinstance(node, CommentedMappingNode) and
isinstance(parent, CommentedMappingNode))):
# Emit CommentEvents, but only if the current node is not a
# CommentedMappingNode nested in another CommentedMappingNode (in
# which case we would have already emitted its comment via the
# parent mapping)
self.emit(CommentEvent(node.comment))
super().serialize_node(node, parent, index)
接下来,Emitter
需要对CommentEvent
s 进行子类化处理。这可能是最棘手的部分,因为正如我所写,发射器有点复杂和脆弱,并且以难以修改状态机的方式编写(我很想更清楚地重写它,但没有时间现在)。所以我尝试了许多不同的解决方案。
这里的关键方法是Emitter.emit
处理事件流,并调用“状态”方法,这些方法根据机器所处的状态执行一些操作,而这些状态又受到流中出现的事件的影响。一个重要的认识是,在许多情况下,流处理在等待更多事件进入时被挂起——这就是该Emitter.need_more_events
方法的职责。在某些情况下,在处理当前事件之前,需要先进入更多事件。例如,在MappingStartEvent
至少需要在流上缓冲 3 个事件:第一个键/值对,以及可能的下一个键。发射器在开始格式化地图之前需要知道地图中是否有一个或多个项目,并且可能还需要知道第一个键/值对的长度。在处理当前事件之前所需的事件数在need_more_events
方法中是硬编码的。
问题是这并没有考虑CommentEvent
到事件流上现在可能存在的 s,这不应该影响其他事件的处理。因此Emitter.need_events
,解释 s 存在的方法CommentEvent
。例如,如果当前事件是MappingStartEvent
,并且缓冲了 3 个后续事件,如果其中一个是CommentEvent
我们无法计算的,那么我们至少需要 4 个事件(以防下一个是预期事件之一在映射中)。
最后,每次CommentEvent
在流中遇到 a 时,我们都会强行跳出当前的事件处理循环来处理注释的写入,然后CommentEvent
从流中弹出并继续,就好像什么都没发生一样。这是最终结果:
import textwrap
from yaml.emitter import Emitter
class CommentedEmitter(Emitter):
def need_more_events(self):
if self.events and isinstance(self.events[0], CommentEvent):
# If the next event is a comment, always break out of the event
# handling loop so that we divert it for comment handling
return True
return super().need_more_events()
def need_events(self, count):
# Hack-y: the minimal number of queued events needed to start
# a block-level event is hard-coded, and does not account for
# possible comment events, so here we increase the necessary
# count for every comment event
comments = [e for e in self.events if isinstance(e, CommentEvent)]
return super().need_events(count + min(count, len(comments)))
def emit(self, event):
if self.events and isinstance(self.events[0], CommentEvent):
# Write the comment, then pop it off the event stream and continue
# as normal
self.write_comment(self.events[0].value)
self.events.pop(0)
super().emit(event)
def write_comment(self, comment):
indent = self.indent or 0
width = self.best_width - indent - 2 # 2 for the comment prefix '# '
lines = ['# ' + line for line in wrap(comment, width)]
for line in lines:
if self.encoding:
line = line.encode(self.encoding)
self.write_indent()
self.stream.write(line)
self.write_line_break()
我还尝试了不同的方法来实现write_comment
. 基Emitter
类有它自己的方法 ( write_plain
),它可以通过适当的缩进和换行处理将文本写入流。但是,它不够灵活,无法处理诸如注释之类的内容,其中每一行都需要以'# '
. 我尝试的一种技术是猴子修补write_indent
处理这种情况的方法,但最后它太丑陋了。我发现简单地使用 Python 的内置textwrap.wrap
就足够了。
接下来,我们通过子类化现有SafeDumper
但将我们的新类插入 MRO 来创建转储程序:
from yaml import SafeDumper
class CommentedDumper(CommentedEmitter, CommentedSerializer,
CommentedRepresenter, SafeDumper):
"""
Extension of `yaml.SafeDumper` that supports writing `CommentedMapping`s with
all comments output as YAML comments.
"""
这是一个示例用法:
>>> import yaml
>>> d = CommentedMapping({
... 'a': 1,
... 'b': 2,
... 'c': {'d': 3},
... }, comment='my commented dict', comments={
... 'a': 'a comment',
... 'b': 'b comment',
... 'c': 'long string ' * 44,
... ('c', 'd'): 'd comment'
... })
>>> print(yaml.dump(d, Dumper=CommentedDumper))
# my commented dict
# a comment
a: 1
# b comment
b: 2
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string
c:
# d comment
d: 3
我还没有非常广泛地测试这个解决方案,它可能仍然包含错误。当我更多地使用它并找到角落案例等时,我会更新它。