A 在我的音频网络中有一些问题(我使用 IP 音频) - 有时我的音频流中会出现短暂的间隙。我有一个记录所有流的记录器。我用 python 和 ffmpeg 编写了小脚本(并借用了一些 JavaScript 进行可视化:))来查找 logger mp3 文件中的空白。总比没有好,但我有很多错误检测 - 手动检查所有结果非常烦人 - 脚本每小时发现 20 到 200 个间隙,通常只有 1-10 个由某些故障引起的间隙 - 所有其他都是短期的歌曲、语音等中的低音频级别。我正在寻找高级机器学习/数据挖掘机制来自动检查差距,只留下我想要的。我可以提供很多“真”间隙(带数据的数组)和“假” 教机器的间隙,然后只想给它提供带有间隙的数据标记,以比较它是否看起来像“真实”间隙。对于最快的解决方案,您有什么建议?请注意,Python 是我唯一能写一点的东西。:/ 此时,gap search 的代码如下。它在 mp3 文件或包含文件的文件夹中查找持续时间大于 gap_min ms 且小于 gap_max ms 的间隙。
import numpy as np
import subprocess, os, sys
import ntpath
tolerance=150#100
gap_min=0.007#0.021
gap_max=0.035#0.03
sample_rate=48000
gap_file_duration=3#duration of the output mp3 files with gaps
ffmpeg_path=r'/Applications/ffmpeg'
temp_folder=r'/Users/user/Downloads/'
result_folder=r'/Users/user/Downloads/tmp/'
target_LUFS=-9#in LUFS
def samples_to_timecode(samples):
return '{0:02d}:{1:02d}:{2:02d}.{3:02d}'.format(int(samples / (3600*sample_rate)),
int(samples / (60*sample_rate) % 60),
int(samples / sample_rate % 60),
int(samples % sample_rate))
def timecode_to_samples(timecode):
return sum(f * int(t) for f,t in zip((3600*sample_rate, 60*sample_rate, sample_rate, 1), timecode.split(':')))
def seconds_to_timecode(seconds):
return '{0:02d}:{1:02d}:{2:03f}'.format(int(seconds / (3600)),
int(seconds / (60) % 60),
seconds % 60)#,
#int(seconds % 1000 % 60))
def analyze_bin_file(source_file):
print('Analizing start...')
data = np.memmap(source_file, dtype='h', mode='r')
zero_indexes=np.where(np.logical_and(data>=-tolerance, data<=tolerance))
gap_start=None
gaps_array=[]
for i in range(len(zero_indexes[0])-1):
if zero_indexes[0][i+1]-zero_indexes[0][i] == 1:
if not gap_start: gap_start=i
else:
if gap_start:
if ((zero_indexes[0][i]-zero_indexes[0][gap_start]) >= (gap_min*sample_rate)) and ((zero_indexes[0][i]-zero_indexes[0][gap_start]) <= (gap_max*sample_rate)):
gaps_array.append([float(zero_indexes[0][gap_start])/sample_rate,
float(zero_indexes[0][i])/sample_rate,
samples_to_timecode(zero_indexes[0][gap_start]),
round(float(zero_indexes[0][i]-zero_indexes[0][gap_start])/sample_rate,3)])
print('Gaps found: %s' % len(gaps_array))
gap_start=None
os.remove(source_file)#For some reasons it works badly in Windows. Just comment this line if cause problem. But you should delete temporary bin files manually after that.
print('Analizing done!')
return gaps_array
def execute_cmd(cmd):
p = subprocess.Popen(cmd , shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
return out.rstrip(), err.rstrip(), p.returncode
def prepare_bin_file(source_file):
print('Start preparing binary file...')
result_file_path=temp_folder+ntpath.basename(source_file)+'.bin'
result=execute_cmd('{0} -i {1} -ar {4} -af volume={3} -ac 1 -map 0:a -c:a pcm_s16le -y -f data {2}'.format(ffmpeg_path,
source_file,
result_file_path,
volume,
sample_rate))
if result[2] == 0:
print('Preparing done!')
return result_file_path
else:
print('Error occures while preparing!')
def cut_gaps(mp3_file,gaps_array):
print('Cutting file {0} start...'.format(mp3_file))
result_files=[]
path_list = mp3_file.split(os.sep)
for gap in range(len(gaps_array)):
gap_start=seconds_to_timecode(gaps_array[gap][0]-float(gap_file_duration)/2)
gap_duration=gap_file_duration+gaps_array[gap][3]
result=execute_cmd('{0} -y -i {1} -ss {2} -t {3} -c:a copy {4}'.format(ffmpeg_path,
mp3_file,
gap_start,
gap_duration,
result_folder+path_list[-2]+os.sep+'mp3'+os.sep+ntpath.basename(mp3_file)+'.{0:03d}'.format(gap)+'.mp3'))
#print('Save bin data file {0} of {1} {2}'.format(gap+1, len(gaps_array), 'OK' if (result_bin[-1] == 0) else 'ERROR'))
#print(result_bin)
result_files.append(ntpath.basename(mp3_file)+'.{0:03d}'.format(gap)+'.mp3')
print('Cutting file {0} of {1} {2}'.format(gap+1, len(gaps_array), 'OK' if (result[-1] == 0) else 'ERROR'))
print('Cutting done!')
return result_files
def make_report(source_file, gaps_array, cut_files):
path_list = source_file.split(os.sep)
report=open(result_folder+path_list[-2]+os.sep+ntpath.basename(source_file)+'.html','w')
report.write('<!doctype html><html lang=""><head></head><html><body><script src="https://cdnjs.cloudflare.com/ajax/libs/wavesurfer.js/1.1.2/wavesurfer.min.js"></script>')
report.write('<div>File {0} analizing report<br>'.format(source_file))
report.write('Searching parameters:<br>Gap minimum {0} second<br>Gap maximum {1} second<br>Tolerance value {2}<br>Analyze volume {3} dB<hr><hr></div>'.format(gap_min,
gap_max,
tolerance,
volume))
if len(gaps_array) > 0:
for gap_no in range(len(gaps_array)):
report.write('<div>Gap No {0}<br>Gap start {1}<br>Gap duration {2}ms</div>'.format(gap_no,
gaps_array[gap_no][2],
gaps_array[gap_no][3]*1000))
html="""
<div id='waveform""" + str(gap_no) + """'></div>
<div style='text-align: center'>
<button class='btn btn-primary' onclick='wavesurfer""" + str(gap_no) + """.playPause()'>
<i class='glyphicon glyphicon-play'></i>
Play
</button>
<p class='row'>
<div class='col-xs-1'>
<i class='glyphicon glyphicon-zoom-in'></i>
</div>
<div class='col-xs-10'>
<input id='slider""" + str(gap_no) + """' type='range' min='1' max='4000' value='1' style='width: 100%' />
</div>
<div class='col-xs-1'>
<i class='glyphicon glyphicon-zoom-out'></i>
</div>
</p>
</div>
"""
report.write(html)
script="""
<script>
var wavesurfer""" + str(gap_no) + """ = WaveSurfer.create({
container: '#waveform""" + str(gap_no) + """',
waveColor: 'red',
progressColor: 'purple'
});
wavesurfer""" + str(gap_no) + """.load('./mp3/""" + cut_files[gap_no] + """');
var slider""" + str(gap_no) + """ = document.querySelector('#slider""" + str(gap_no) + """');
slider""" + str(gap_no) + """.oninput = function () {
var zoomLevel = Number(slider""" + str(gap_no) + """.value);
wavesurfer""" + str(gap_no) + """.zoom(zoomLevel);
};
</script>
"""
report.write(script)
else:
report.write('<div>No gaps found!</div>')
report.write('</body></html>')
report.close()
def normalize_file(source):
print('Analizing integrated loudness...')
result = execute_cmd('{0} -nostats -i {1} -filter_complex ebur128 -f null -'.format(ffmpeg_path,
source))
if result[-1] == 0:
summary_index=str(result[1][-255:]).rfind('Summary:')
summary_list=str(result[1][-255:][summary_index:]).split()
I_LUFS = float(summary_list[summary_list.index('I:') + 1])
gainLog = -(I_LUFS - target_LUFS)
volume = 10 ** (gainLog / 20)
print('Analizing complete. I= {0} LUFS. Volume change value={1}.'.format(I_LUFS, volume))
else:
print('Error!')
return volume
def run(source):
if os.path.isfile(source) or os.path.isdir(source):
path_list = source.split(os.sep)
if not os.path.isdir(result_folder+path_list[-2]):
os.makedirs(result_folder+path_list[-2])
if not os.path.isdir(result_folder+path_list[-2]+os.sep+'mp3'):
os.makedirs(result_folder+path_list[-2]+os.sep+'mp3')
else:
print('Error! File of folder {0} not found!'.format(source))
if os.path.isfile(source):
global volume
volume=normalize_file(source)
bin_file=prepare_bin_file(source)
gaps_array=analyze_bin_file(bin_file)
if len(gaps_array):
cut_files=cut_gaps(source, gaps_array)
make_report(source, gaps_array, cut_files)
else:
make_report(source, gaps_array, cut_files=[])
elif os.path.isdir(source):
for file in os.listdir(source):
if file.endswith(".mp3"):
print(source ,file)
run(source+os.sep+file)
src=r'/Users/user/Downloads/2016-08-02'
if len(sys.argv) > 1:
run(sys.argv[1])
else:
run(src)
结果是带有波形的 HTML 文件。结果仅在 Firefox 浏览器中正常工作。虚假差距: 虚假差距 1 的示例 真实差距: 真实差距 1 的示例
更新。因为算法对音量级别非常敏感,所以我在分析数据之前添加了音量归一化。它不适用于输出文件 - 它只是在分析数据之前对其进行标准化。