只是一个注释 - 我发现了这个:
[Enthought-Dev] chaco 比 matplotlib 快
我记得在某处读过您应该实现 _downsample 方法,因为最佳算法取决于您收集的数据类型。
而且,除了该帖子中提到的_downsample实现之外,我找不到任何具有 _downsample 实现的示例,这不是独立的 - 我尝试并构建了一个独立的示例,包括在下面。
该示例基本上搞砸了拖动和缩放,(如果超出范围,绘图就会消失,或者在拖动移动时拉伸) - 它开始放大;但可以在 OP 中显示的范围内将其缩小 - 然后它会显示完全相同的绘图渲染问题。所以下采样本身不是解决方案,所以这可能是一个错误?
import numpy as np
import numpy.random as npr
from pprint import pprint
from traits.api import HasTraits, Instance
from chaco.api import Plot, ArrayPlotData, VPlotContainer
from traitsui.api import View, Item
from enable.component_editor import ComponentEditor
from chaco.tools.api import PanTool, BetterSelectingZoom
#
from chaco.api import BaseXYPlot, LinearMapper, AbstractPlotData
from enable.api import black_color_trait, LineStyle
from traits.api import Float, Enum, Int, Str, Trait, Event, Property, Array, cached_property, Bool, Dict
from chaco.abstract_mapper import AbstractMapper
from chaco.abstract_data_source import AbstractDataSource
from chaco.array_data_source import ArrayDataSource
from chaco.data_range_1d import DataRange1D
tlen = 112607
alr = npr.randint(0, 4000, tlen)
tx = np.arange(0.0, 30.0-0.00001, 30.0/tlen)
ty = np.arange(0, tlen, 1) % 10000 + alr
pprint(len(ty))
class ChacoTest(HasTraits):
container = Instance(VPlotContainer)
traits_view = View(
Item('container', editor=ComponentEditor(), show_label=False),
width=800, height=500, resizable=True,
title="Chaco Test"
)
downsampling_cutoff = Int(4)
def __init__(self):
super(ChacoTest, self).__init__()
pprint(ty)
self.plotdata = ArrayPlotData(x = tx, y = ty)
self.plotobj = TimeSeriesPlot(self.plotdata)
self.plotobj.setplotranges("x", "y")
self.container = VPlotContainer(self.plotobj, spacing=5, padding=5, bgcolor="lightgray")
self.plotobj.tools.append(PanTool(self.plotobj))
self.plotobj.overlays.append(BetterSelectingZoom(self.plotobj))
# decimate from:
# https://bitbucket.org/mjrosen/neurobehavior/raw/097ef3719d1263a8b303d29c31ab71b6e792ab04/cns/widgets/views/decimated_plot.py
def decimate(data, screen_width, downsampling_cutoff=4, mode='extremes'):
data_width = data.shape[-1]
downsample = np.floor((data_width/screen_width)/4.)
if downsample > downsampling_cutoff:
return globals()['decimate_'+mode](data, downsample)
else:
return data
def decimate_extremes(data, downsample):
last_dim = data.ndim
offset = data.shape[-1] % downsample
if data.ndim == 2:
shape = (len(data), -1, downsample)
else:
shape = (-1, downsample)
data = data[..., offset:].reshape(shape).copy()
data_min = data.min(last_dim)
data_max = data.max(last_dim)
return data_min, data_max
def decimate_mean(data, downsample):
offset = len(data) % downsample
if data.ndim == 2:
shape = (-1, downsample, data.shape[-1])
else:
shape = (-1, downsample)
data = data[offset:].reshape(shape).copy()
return data.mean(1)
# based on class from decimated_plot.py, also
# neurobehavior/cns/chaco_exts/timeseries_plot.py ;
# + some other code from chaco
class TimeSeriesPlot(BaseXYPlot):
color = black_color_trait
line_width = Float(1.0)
line_style = LineStyle
reference = Enum('most_recent', 'trigger')
traits_view = View("color@", "line_width")
downsampling_cutoff = Int(100)
signal_trait = "updated"
decimate_mode = Str('extremes')
ch_index = Trait(None, Int, None)
# Mapping of data names from self.data to their respective datasources.
datasources = Dict(Str, Instance(AbstractDataSource))
index_mapper = Instance(AbstractMapper)
value_mapper = Instance(AbstractMapper)
def __init__(self, data=None, **kwargs):
super(TimeSeriesPlot, self).__init__(**kwargs)
self._index_mapper_changed(None, self.index_mapper)
self.setplotdata(data)
self._plot_ui_info = None
return
def setplotdata(self, data):
if data is not None:
if isinstance(data, AbstractPlotData):
self.data = data
elif type(data) in (ndarray, tuple, list):
self.data = ArrayPlotData(data)
else:
raise ValueError, "Don't know how to create PlotData for data" \
"of type " + str(type(data))
def setplotranges(self, index_name, value_name):
self.index_name = index_name
self.value_name = value_name
index = self._get_or_create_datasource(index_name)
value = self._get_or_create_datasource(value_name)
if not(self.index_mapper):
imap = LinearMapper()#(range=self.index_range)
self.index_mapper = imap
if not(self.value_mapper):
vmap = LinearMapper()#(range=self.value_range)
self.value_mapper = vmap
if not(self.index_range): self.index_range = DataRange1D() # calls index_mapper
if not(self.value_range): self.value_range = DataRange1D()
self.index_range.add(index) # calls index_mapper!
self.value_range.add(value)
# now do it (right?):
self.index_mapper = LinearMapper(range=self.index_range)
self.value_mapper = LinearMapper(range=self.value_range)
def _get_or_create_datasource(self, name):
if name not in self.datasources:
data = self.data.get_data(name)
if type(data) in (list, tuple):
data = array(data)
if isinstance(data, np.ndarray):
if len(data.shape) == 1:
ds = ArrayDataSource(data, sort_order="none")
elif len(data.shape) == 2:
ds = ImageData(data=data, value_depth=1)
elif len(data.shape) == 3:
if data.shape[2] in (3,4):
ds = ImageData(data=data, value_depth=int(data.shape[2]))
else:
raise ValueError("Unhandled array shape in creating new plot: " \
+ str(data.shape))
elif isinstance(data, AbstractDataSource):
ds = data
else:
raise ValueError("Couldn't create datasource for data of type " + \
str(type(data)))
self.datasources[name] = ds
return self.datasources[name]
def get_screen_points(self):
self._gather_points()
return self._downsample()
def _data_changed(self):
self.invalidate_draw()
self._cache_valid = False
self._screen_cache_valid = False
self.request_redraw()
def _gather_points(self):
if not self._cache_valid:
range = self.index_mapper.range
#if self.reference == 'most_recent':
# values, t_lb, t_ub = self.get_recent_range(range.low, range.high)
#else:
# values, t_lb, t_ub = self.get_range(range.low, range.high, -1)
values, t_lb, t_ub = self.data[self.value_name][range.low:range.high], range.low, range.high
#if self.ch_index is None:
# self._cached_data = values
#else:
# #self._cached_data = values[:,self.ch_index]
self._cached_data = values
self._cached_data_bounds = t_lb, t_ub
self._cache_valid = True
self._screen_cache_valid = False
def _downsample(self):
if not self._screen_cache_valid:
val_pts = self._cached_data
screen_min, screen_max = self.index_mapper.screen_bounds
screen_width = screen_max-screen_min
values = decimate(val_pts, screen_width, self.downsampling_cutoff,
self.decimate_mode)
if type(values) == type(()):
n = len(values[0])
s_val_min = self.value_mapper.map_screen(values[0])
s_val_max = self.value_mapper.map_screen(values[1])
self._cached_screen_data = s_val_min, s_val_max
else:
s_val_pts = self.value_mapper.map_screen(values)
self._cached_screen_data = s_val_pts
n = len(values)
t = np.linspace(*self._cached_data_bounds, num=n)
t_screen = self.index_mapper.map_screen(t)
self._cached_screen_index = t_screen
self._screen_cache_valid = True
return [self._cached_screen_index, self._cached_screen_data]
def _render(self, gc, points):
idx, val = points
if len(idx) == 0:
return
gc.save_state()
gc.set_antialias(True)
gc.clip_to_rect(self.x, self.y, self.width, self.height)
gc.set_stroke_color(self.color_)
gc.set_line_width(self.line_width)
#gc.set_line_width(5)
gc.begin_path()
#if len(val) == 2:
if type(val) == type(()):
starts = np.column_stack((idx, val[0]))
ends = np.column_stack((idx, val[1]))
gc.line_set(starts, ends)
else:
gc.lines(np.column_stack((idx, val)))
gc.stroke_path()
self._draw_default_axes(gc)
gc.restore_state()
if __name__ == "__main__":
ChacoTest().configure_traits()