硬核奶爸!用树莓派做个“智能婴儿监视器”:
大数据文摘出品
来源:Medium
编译:陈之炎
作为一名新晋奶爸和程序员,我在新身份中最常思考的问题就是“照料婴儿的工作真的无法自动化吗?”
当然,这也许能够实现,就算有给孩子换尿布的机器人(假设有足够多的父母同意在自己蹒跚学步的孩子身上测试这样的设备),愿意自动化照料婴儿的父母还真为数不多。
作为父亲,我首先意识到的事情是:婴儿很多时候都会在哭,即使我在家,也不可能总是能听到孩子的哭声。
通常,商用婴儿监视器可以填补这一空白,它们充当对讲机,让你在另一个房间也能听到婴儿的哭声。
但我很快意识到:商用婴儿监视器没有我想象中的理想设备智能:
它们只能充当一个传声筒:把声音从源头带到扬声器,却无法发现孩子哭声的含义; 当家长要去到另一个房间里时,相应要把扬声器带到另一个房间,无法在任何其他现有的音频设备上播放声音; 扬声器通常是低功率扬声器,无法连接到外部扬声器-这意味着,如果我在另一个房间播放音乐,我可能会听不到孩子的哭声,即便监控器和我在同一个房间也无法听到; 大多数扬声器都是在低功率无线电波上工作的,这意味着如果婴儿在他/她的房间里,而你必须走到楼下,它们才能工作。因此,我萌生了自制一个更好用的“智能婴儿监视器”的想法。
说干就干,我先给这个“智能婴儿监视器”定义了一些需要的功能。
它可以运行于价廉物美的树莓派(RaspberryPI)与USB麦克风。 当孩子开始/停止哭泣时,它应该检测到孩子的哭声,并通知我(理想情况下是在我的手机上),或者跟踪我仪表板上的数据点,或者运行相应的任务。它不应该是一个单纯的对讲器,简单地将声音从一个源传递到另一个兼容的设备。 它能够在扬声器,智能手机,电脑等设备上传输音频。 它不受源和扬声器之间距离的影响,无需在整个房子里将扬声器移来移去。 它还应该有一个摄像头,可以利用摄像头对孩子实时监控,当他一开始哭,我便可以抓拍到图片或婴儿床的短视频,以检查有什么不对劲。来看看一个新晋奶爸如何使用工程师的大脑和开源工具来完成这项任务吧。
采集音频样本首先,购买一块树莓派(RaspberryPi),在SD卡上烧录好Linux操作系统(建议使用RaspberryPI3或更高版本),运行Tensorflow模型。还可以购买一个与树莓派兼容的USB麦克风。
然后安装需要的相关项:
[sudo] apt-get install ffmpeg lame libatlas-base-dev alsa-utils
[sudo] pip3 install tensorflow
第一步,必须记录足够的音频样本,婴儿在什么时候哭,在什么时候不哭。稍后将利用这些样本来训练音频检测模型。
注意:在这个例子中,我将展示如何利用声音检测来识别婴儿的哭声,同样的精准程序可以用来检测任何其它类型的声音-只要它们足够长(例如:警报或邻居家的钻孔声)。
首先,查看音频输入设备:
arecord -l
在树莓派(RaspberryPI)上,得到以下输出(注意,有两个USB麦克风):
**** List of CAPTURE Hardware Devices ****
card 1: Device [USB PnP Sound Device], device 0: USB Audio [USB Audio]
Subdevices: 0/1
Subdevice #0: subdevice #0
card 2: Device_1 [USB PnP Sound Device], device 0: USB Audio [USB Audio]
Subdevices: 0/1
Subdevice #0: subdevice #0
我利用第二个麦克风来记录声音-即卡2,设备0。识别它的ALSA方法要么是hw:2,0(直接访问硬件设备),要么是plughw:2,0(如果需要的话,它会输入采样率和格式转换插件)。确保SD卡上有足够的空间,然后开始录制一些音频:
arecord -D plughw:2,0 -c 1 -f cd | lame - audio.mp3
和孩子在同一个房间里,记录几分钟或几个小时的音频-最好是长时间的沉默、婴儿哭声和其他与之无关的声音-,录音完成后按Ctrl-C。尽可能多的重复这个过程多次,在一天中的不同时刻或不同的日子里获取不同的音频样本。
标注音频示例一旦有了足够的音频样本,就可以把它们复制到电脑上来训练模型了-可以使用SCP复制文件,也可以直接从SD卡上复制。
把它们都存储在相同目录下,例如:~/datasets/sound-detect/audio。另外,为每个示例音频文件创建一个新文件夹,它包含一个音频文件(名为audio.mp3)和一个标注文件(名为labels.json),利用它来标记音频文件中的负/正音频段,原始数据集的结构如下:
~/datasets/sound-detect/audio
-> sample_1
-> audio.mp3
-> labels.json
-> sample_2
-> audio.mp3
-> labels.json
下面:标注录制的音频文件-如果它包含了孩子几个小时的哭声,可能会特别受虐。在你最喜欢的音频播放器或Audacity中打开每个数据集音频文件,并在每个示例目录中创建一个新的label.json文件。确定哭泣开始的确切时间和结束时间,并在labels.json中标注为time_string -> label的关键值结构。例:
{
"00:00": "negative",
"02:13": "positive",
"04:57": "negative",
"15:41": "positive",
"18:24": "negative"
}
在上面的例子中,00:00到02:12之间的所有音频段将被标记为负,02:13到04:56之间的所有音频段将被标记为正,以此类推。
生成数据集对所有的音频示例标注完成之后,接下来是生成数据集,最后将它输入到Tensorflow模型中去。首先,创建了一个名为micmon的通用库和一组用于声音监视的实用工具。然后,开始安装:
git clone git@github.com:/BlackLight/micmon.git
cd micmon
[sudo] pip3 install -r requirements.txt
[sudo] python3 setup.py build install
本模型设计基于音频的频率样本而非原始音频,因为,在这里我们想检测到一个特定的声音,这个声音有着特定的“频谱”标签,即:基频(或基频下降的窄带范围)和一组特定的谐波。这些谐波频率与基波之间的比率既不受振幅的影响(频率比恒定,与输入幅度无关),也不受相位的影响(无论何时开始记录,连续的声音都会有相同的频谱特征)。
这种与振幅和相位无关的特性使得这种方法更有可能训练出一个鲁棒的声音检测模型,而不是简单地将原始音频样本馈送到模型中。此外,该模型可以更简单(可以在不影响性能的情况下将多个频率分为一组,从而可以有效地实现降维),无论样本持续时间多长,该模型将50~ 100个频带作为输入值,一秒钟的原始音频通常包含44100个数据点,并且输入的长度随着样本的持续时间而增加,并且不太容易发生过拟合。
micmon能计算音频样本某些段的FFT(快速傅里叶变换),将结果频谱分为低通和高通滤波器的频带,并将结果保存到一组numpy压缩(.npz)文件中。可以通过在命令行上执行micmon-datagen命令来实现:
micmon-datagen \
--low 250 --high 2500 --bins 100 \
--sample-duration 2 --channels 1 \
~/datasets/sound-detect/audio ~/datasets/sound-detect/data
在上面的示例中,我们从存储在~/dataset/sound-detect/audio下的原始音频样本生成一个数据集,并将生成的频谱数据存储到~/datasets/sound-detect/data. –low和~/datasets/sound-detect/data. --high中, low和high分别表示最低和最高频率,最低频率的默认值为20Hz(人耳可闻的最低频率),最高频率的默认值为20kHz(健康的年轻人耳可闻的最高频率)。
通过对此范围做出限定,尽可能多地捕获希望检测到的其他类型的音频背景和无关谐波的声音。在本案例中, 250-2500赫兹的范围足以检测婴儿的哭声。
婴儿的哭声通常是高频的(歌剧女高音能达到的最高音符在1000赫兹左右),在这里设置了至少双倍的最高频率,以确保能获得足够高的谐波(谐波是更高的频率),但也不要将最高频率设得太高,以防止其他背景声音的谐波。我剪切掉了频率低于250赫兹的音频信号-婴儿的哭声不太可能发生在低频段,例如,可以打开一些positive音频样本,利用均衡器/频谱分析仪,检查哪些频率在positive样本中占主导地位,并将数据集集中在这些频率上。--bins指定了频率空间的组数(默认值:100),更大的数值意味着更高的频率分辨率/粒度,但如果太高,可能会使模型容易发生过度拟合。
脚本将原始音频分割成较小的段,并计算每个段的频谱标签。示例持续时间指定每个音频段有多长时间(默认:2秒)。对于持续时间较长的声音,取更大的值会起到更好的作用,但它同时会减少检测的时间,而且可能会在短音上失效。对于持续时间较短的声音,可以取较低的值,但捕获的片段可能没有足够的信息量来可靠地识别声音。
除了micmon-datagen脚本之外,也可以利用micmonAPI,编写脚本来生成数据集。例:
import os
from micmon.audio import AudioDirectory, AudioPlayer, AudioFile
from micmon.dataset import DatasetWriter
basedir = os.path.expanduser('~/datasets/sound-detect')
audio_dir = os.path.join(basedir, 'audio')
datasets_dir = os.path.join(basedir, 'data')
cutoff_frequencies = [250, 2500]
# Scan the base audio_dir for labelled audio samples
audio_dirs = AudioDirectory.scan(audio_dir)
# Save the spectrum information and labels of the samples to a
# different compressed file for each audio file.
for audio_dir in audio_dirs:
dataset_file = os.path.join(datasets_dir, os.path.basename(audio_dir.path) + '.npz')
print(f'Processing audio sample {audio_dir.path}')
with AudioFile(audio_dir) as reader, \
DatasetWriter(dataset_file,
low_freq=cutoff_frequencies[0],
high_freq=cutoff_frequencies[1]) as writer:
for sample in reader:
writer += sample
无论是使用micmon-datagen还是使用micmon Python API生成数据集,在过程结束时,应该在~/datasets/sound-detect/data目录下找到一堆.npz文件,每个标注后的音频原始文件对应一个数据集。之后,便可以利用这个数据集来训练神经网络进行声音检测。
训练模型micmon利用Tensorflow+Keras来定义和训练模型,有了PythonAPI,可以很容易地实现。例如:
import os
from tensorflow.keras import layers
from micmon.dataset import Dataset
from micmon.model import Model
# This is a directory that contains the saved .npz dataset files
datasets_dir = os.path.expanduser('~/datasets/sound-detect/data')
# This is the output directory where the model will be saved
model_dir = os.path.expanduser('~/models/sound-detect')
# This is the number of training epochs for each dataset sample
epochs = 2
# Load the datasets from the compressed files.
# 70% of the data points will be included in the training set,
# 30% of the data points will be included in the evaluation set
# and used to evaluate the performance of the model.
datasets = Dataset.scan(datasets_dir, validation_split=0.3)
labels = ['negative', 'positive']
freq_bins = len(datasets[0].samples[0])
# Create a network with 4 layers (one input layer, two intermediate layers and one output layer).
# The first intermediate layer in this example will have twice the number of units as the number
# of input units, while the second intermediate layer will have 75% of the number of
# input units. We also specify the names for the labels and the low and high frequency range
# used when sampling.
model = Model(
[
layers.Input(shape=(freq_bins,)),
layers.Dense(int(2 * freq_bins), activation='relu'),
layers.Dense(int(0.75 * freq_bins), activation='relu'),
layers.Dense(len(labels), activation='softmax'),
],
labels=labels,
low_freq=datasets[0].low_freq,
high_freq=datasets[0].high_freq
# Train the model
for epoch in range(epochs):
for i, dataset in enumerate(datasets):
print(f'[epoch {epoch+1}/{epochs}] [audio sample {i+1}/{len(datasets)}]')
model.fit(dataset)
evaluation = model.evaluate(dataset)
print(f'Validation set loss and accuracy: {evaluation}')
# Save the model
model.save(model_dir, overwrite=True)
运行此脚本后(在对模型的准确性感到满意后),可以在~/models/sound-detect目录下找保存的新模型。在我的这个例子中,我采集~5小时的声音就足够用了,通过定义一个较优的频率范围来训练模型,准确率大于98%。如果是在计算机上训练模型,只需将其复制到RaspberryPI,便可以准备进入下一步了。
利用模型进行预测这时候,制作一个脚本:利用以前训练过的模型,当孩子开始哭的时候,通知我们:
import os
from micmon.audio import AudioDevice
from micmon.model import Model
model_dir = os.path.expanduser('~/models/sound-detect')
model = Model.load(model_dir)
audio_system = 'alsa' # Supported: alsa and pulse
audio_device = 'plughw:2,0' # Get list of recognized input devices with arecord -l
with AudioDevice(audio_system, device=audio_device) as source:
for sample in source:
source.pause() # Pause recording while we process the frame
prediction = model.predict(sample)
print(prediction)
source.resume() # Resume recording
在RaspberryPI上运行脚本,并让它运行一段时间-如果在过去2秒内没有检测到哭声,它将在标准输出中打印negative,如果在过去2秒内检测到哭声否,则在标准输出中打印positive。
然而,如果孩子哭了,简单地将消息打印到标准输出中并没有太大作用-我们希望得到明确实时通知!
可以利用Platypush来实现这个功能。在本例中,我们将使用pushbullet集成在检测到cry时向我们的手机发送消息。接下来安装Redis(Platypush用于接收消息)和Platypush,利用HTTP和Pushbullet来集成:
[sudo] apt-get install redis-server
[sudo] systemctl start redis-server.service
[sudo] systemctl enable redis-server.service
[sudo] pip3 install 'platypush[http,pushbullet]'
将Pushbullet应用程序安装在智能手机上,到pushbullet.com上以获取API token。然后创建一个~/.config/platypush/config.yaml文件,该文件启用HTTP和Pushbullet集成:
backend.http:
enabled: True
pushbullet:
token: YOUR_TOKEN
接下来,对前面的脚本进行修改,不让它将消息打印到标准输出,而是触发一个可以被Platypush hook捕获的自定义事件CustomEvent:
#!/usr/bin/python3
import argparse
import logging
import os
import sys
from platypush import RedisBus
from platypush.message.event.custom import CustomEvent
from micmon.audio import AudioDevice
from micmon.model import Model
logger = logging.getLogger('micmon')
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('model_path', help='Path to the file/directory containing the saved Tensorflow model')
parser.add_argument('-i', help='Input sound device (e.g. hw:0,1 or default)', required=True, dest='sound_device')
parser.add_argument('-e', help='Name of the event that should be raised when a positive event occurs', required=True, dest='event_type')
parser.add_argument('-s', '--sound-server', help='Sound server to be used (available: alsa, pulse)', required=False, default='alsa', dest='sound_server')
parser.add_argument('-P', '--positive-label', help='Model output label name/index to indicate a positive sample (default: positive)', required=False, default='positive', dest='positive_label')
parser.add_argument('-N', '--negative-label', help='Model output label name/index to indicate a negative sample (default: negative)', required=False, default='negative', dest='negative_label')
parser.add_argument('-l', '--sample-duration', help='Length of the FFT audio samples (default: 2 seconds)', required=False, type=float, default=2., dest='sample_duration')
parser.add_argument('-r', '--sample-rate', help='Sample rate (default: 44100 Hz)', required=False, type=int, default=44100, dest='sample_rate')
parser.add_argument('-c', '--channels', help='Number of audio recording channels (default: 1)', required=False, type=int, default=1, dest='channels')
parser.add_argument('-f', '--ffmpeg-bin', help='FFmpeg executable path (default: ffmpeg)', required=False, default='ffmpeg', dest='ffmpeg_bin')
parser.add_argument('-v', '--verbose', help='Verbose/debug mode', required=False, action='store_true', dest='debug')
parser.add_argument('-w', '--window-duration', help='Duration of the look-back window (default: 10 seconds)', required=False, type=float, default=10., dest='window_length')
parser.add_argument('-n', '--positive-samples', help='Number of positive samples detected over the window duration to trigger the event (default: 1)', required=False, type=int, default=1, dest='positive_samples')
opts, args = parser.parse_known_args(sys.argv[1:])
return opts
def main():
args = get_args()
if args.debug:
logger.setLevel(logging.DEBUG)
model_dir = os.path.abspath(os.path.expanduser(args.model_path))
model = Model.load(model_dir)
window = []
cur_prediction = args.negative_label
bus = RedisBus()
with AudioDevice(system=args.sound_server,
device=args.sound_device,
sample_duration=args.sample_duration,
sample_rate=args.sample_rate,
channels=args.channels,
ffmpeg_bin=args.ffmpeg_bin,
debug=args.debug) as source:
for sample in source:
source.pause() # Pause recording while we process the frame
prediction = model.predict(sample)
logger.debug(f'Sample prediction: {prediction}')
has_change = False
if len(window) < args.window_length:
window += [prediction]
else:
window = window[1:] + [prediction]
positive_samples = len([pred for pred in window if pred == args.positive_label])
if args.positive_samples <= positive_samples and \
prediction == args.positive_label and \
cur_prediction != args.positive_label:
cur_prediction = args.positive_label
has_change = True
logging.info(f'Positive sample threshold detected ({positive_samples}/{len(window)})')
elif args.positive_samples > positive_samples and \
prediction == args.negative_label and \
cur_prediction != args.negative_label:
cur_prediction = args.negative_label
has_change = True
logging.info(f'Negative sample threshold detected ({len(window)-positive_samples}/{len(window)})')
if has_change:
evt = CustomEvent(subtype=args.event_type, state=prediction)
bus.post(evt)
source.resume() # Resume recording
if __name__ == '__main__':
main()
将上面的脚本保存为~/bin/micmon_detect.py。如果在滑动窗口时间内上检测到positive_samples样本(为了减少预测错误或临时故障引起的噪声),则脚本触发事件,并且它只会在当前预测从negative到positive的情况下触发事件。然后,它被分派给Platypush。对于其它不同的声音模型(不一定是哭泣婴儿),该脚本也是通用的,对应其它正/负标签、其它频率范围和其它类型的输出事件,这个脚本也能工作。
创建一个Platypush hook来对事件作出响应,并向设备发送通知。首先,创建 Platypush脚本目录:
mkdir -p ~/.config/platypush/scripts
cd ~/.config/platypush/scripts
# Define the directory as a module
touch __init__.py
# Create a script for the baby-cry events
vi babymonitor.py
babymonitor.py的内容为:
from platypush.context import get_plugin
from platypush.event.hook import hook
from platypush.message.event.custom import CustomEvent
@hook(CustomEvent, subtype='baby-cry', state='positive')
def on_baby_cry_start(event, **_):
pb = get_plugin('pushbullet')
pb.send_note(title='Baby cry status', body='The baby is crying!')
@hook(CustomEvent, subtype='baby-cry', state='negative')
def on_baby_cry_stop(event, **_):
pb = get_plugin('pushbullet')
pb.send_note(title='Baby cry status', body='The baby stopped crying - good job!')
为Platypush创建一个服务文件,并启动/启用服务,这样它就会在终端上启动:
mkdir -p ~/.config/systemd/user
wget -O ~/.config/systemd/user/platypush.service \
https://raw.githubusercontent.com/BlackLight/platypush/master/examples/systemd/platypush.service
systemctl --user start platypush.service
systemctl --user enable platypush.service
为婴儿监视器创建一个服务文件-如:
~/.config/systemd/user/babymonitor.service:
[Unit]
Description=Monitor to detect my baby's cries
After=network.target sound.target
[Service]
ExecStart=/home/pi/bin/micmon_detect.py -i plughw:2,0 -e baby-cry -w 10 -n 2 ~/models/sound-detect
Restart=always
RestartSec=10
[Install]
WantedBy=default.target
该服务将启动ALSA设备plughw:2,0上的麦克风监视器,如果在过去10秒内检测到至少2个positive 2秒样本,并且先前的状态为negative,则会触发state=positive事件;如果在过去10秒内检测到少于2个positive样本,并且先前的状态为positive,则state=negative。然后可以启动/启用服务:
systemctl --user start babymonitor.service
systemctl --user enable babymonitor.service
确认一旦婴儿开始哭泣,就会在手机上收到通知。如果没有收到通知,可以检查一下音频示例的标签、神经网络的架构和参数,或样本长度/窗口/频带等参数是否正确。
此外,这是一个相对基本的自动化例子-可以为它添加更多的自动化任务。例如,可以向另一个Platypush设备发送请求(例如:在卧室或客厅),用TTS插件大声提示婴儿在哭。还可以扩展micmon_detect.py脚本,以便捕获的音频样本也可以通过HTTP流-例如使用Flask包装器和ffmpeg进行音频转换。另一个有趣的用例是,当婴儿开始/停止哭泣时,将数据点发送到本地数据库(可以参考我先前关于“如何使用Platypush+PostgreSQL+Mosquitto+Grafana创建灵活和自我管理的仪表板”的文章https://towardsdatascience.com/how-to-build-your-home-infrastructure-for-data-collection-and-visualization-and-be-the-real-owner-af9b33723b0c):这是一组相当有用的数据,可以用来跟踪婴儿睡觉、醒着或需要喂食时的情况。虽然监测宝宝一直是我开发micmon的初衷,但是同样的程序也可以用来训练和检测其它类型声音的模型。最后,可以考虑使用一组良好的电源或锂电池组,这样监视器便可以便携化了。
安装宝贝摄像头有了一个好的音频馈送和检测方法之后,还可以添加一个视频馈送,以保持对孩子的监控。一开始,我 在RaspberryPI3上安装了一个PiCamera用于音频检测,后来,我发现这个配置相当不切实际。想想看:一个RaspberryPi 3、一个附加的电池包和一个摄像头,组合在一起会相当笨拙;如果你找到一个轻型相机,可以很容易地安装在支架或灵活的手臂上,而且可以四处移动,这样,无论他/她在哪里,都可以密切关注孩子。最终,我选择了体积较小的RaspberryPi Zero,它与PiCamera兼容,再配一个小电池。
婴儿监视器摄像头模块的第一个原型
同样,先插入一个烧录了与RaspberryPI兼容的操作系统的SD卡。然后在其插槽中插入一个与RaspberryPI兼容的摄像头,确保摄像头模块在raspi-config中启用,安装集成有PiCamera的Platypush:
[sudo] pip3 install 'platypush[http,camera,picamera]'
然后在~/.config/platypush/config.yaml:中添加相机配置:
camera.pi:
listen_port: 5001
在Platypush重新启动时检查此配置,并通过HTTP从摄像头获取快照:
wget http://raspberry-pi:8008/camera/pi/photo.jpg
或在浏览器中打开视频:
http://raspberry-pi:8008/camera/pi/video.mjpg
同样,当应用程序启动时,可以创建一个hook,该hook通过TCP/H264启动摄像头馈送:
mkdir -p ~/.config/platypush/scripts
cd ~/.config/platypush/scripts
touch __init__.py
vi camera.py
也可以通过VLC:播放视频。
vlc tcp/h264://raspberry-pi:5001
在手机上通过VLC应用程序或利用RPi Camera Viewer应用程序观看视频。
从想法到最后实现效果还不错,这也算是一个新晋奶爸从护理琐事中脱身的自我救赎吧。
原文链接:
https://towardsdatascience.com/create-your-own-smart-baby-monitor-with-a-raspberrypi-and-tensorflow-5b25713410ca
责任编辑: