RoboTry

ROSを使ってロボットを動かすぞ

Raspberry Pi でラズパイマウスを動かしてみた

メインはROSの話です。やってることはタイトルのとおりです。

やってることはネタですが、学ぶ場面が多々あって、まとめてみました。

ラズパイマウスを持ってなくても参考になると思います。

なにをするか

まずはこちらをご覧ください。

この動画では

  1. Raspberry Piが加速度センサから角度(傾き)情報を受けとる。
  2. Raspberry Piが角度情報をもとに速度コマンドを生成する。
  3. 速度コマンドをラズパイマウスとPCが受信する。
  4. PCは速度コマンドを描画する。
  5. ラズパイマウスはモータを回して走行する。

ということをしています。

絵を描くとこんな感じです。

f:id:macakasit:20180602202200p:plain
図.やってることの概要

この記事には

  • 加速度センサADXL345とi2cで通信する方法(詳細は省く)
  • Raspberry Pi 2台とPC 1台がROSで通信する方法

を書いています。

GitHubソースコードを公開しました。 ものが揃えば動画と同じように動かせると思います。

用意するもの

筆者の環境を記載します。 全く同じものを揃える必要はないです。

事前準備

この記事の内容を実施するための準備です ここでは説明を省略します。

  • masterpc、raspimouse、raspi_homeがそれぞれssh接続できる
  • ROSの簡単な知識を持っている(ROSのTutorial、もしくはROS本を9章まで進めてたらOK)
  • raspi_homeはi2cが使用できる(参考)

raspi_homeの回路

はじめにraspi_homeの回路を組み立てます

回路図は以下のとおりです。Fritzingで作成しました。

f:id:macakasit:20180603150928p:plain
図.raspi_home回路

組み立てるとこんな感じです。

f:id:macakasit:20180603150336j:plain
図.raspi_home組み立て後

加速度センサの値をラズパイマウスの速度コマンドに変換

回路ができたので、次はプログラムを書きます。

プログラムはGitHubに公開しています。

プログラムの中身はこのようになっています。

f:id:macakasit:20180603175257p:plain
図.プログラムの中身

加速度センサの値取得

加速度センサADXL345の詳細な使い方は、データシートを読んでください。

ADXL345からは、それぞれx, y, z軸の加速度を取得できます。また、x, y軸については加速度が蓄積されているため、角度の変化量として受け取れます。

別サイト*1 を参考にして、以下のようなプログラムを作成しました。 GitHub

ADXL345は、値の取得周期や感度、データ長などパラメータをセッティングできますが、参考サイトの設定から変更しません

adxl345.py

#!/usr/bin/env python
#encoding: utf8

import RPi.GPIO as GPIO
import os
import smbus
import time
import math


class ADXL345():

    def __init__(self):
        self.dev_addr = 0x1d
        my_bus = ""

        if GPIO.RPI_INFO['P1_REVISION'] == 1:
            my_bus = 0
        else:
            my_bus = 1

        self.sm_bus = smbus.SMBus(my_bus)

        self._prev_value = {'x':0, 'y':0, 'z':0}
        self._addr = {'x':0x32, 'y':0x34, 'z':0x36}
        self._axes = ['x', 'y', 'z']


    def setUp(self):
        self.sm_bus.write_byte_data(self.dev_addr, 0x2c, 0x0b) # bandwidth rate 200 Hz
        self.sm_bus.write_byte_data(self.dev_addr, 0x31, 0x00) # DATA_FORMAT 10bit 2g
        self.sm_bus.write_byte_data(self.dev_addr, 0x38, 0x00) # FIFO_CTL OFF
        self.sm_bus.write_byte_data(self.dev_addr, 0x2d, 0x08) # POWER_CTL Enable

    def _get_value(self, axis):
        addr = self._addr[axis]

        try:
            # データは2の補数で表現されている
            hi_byte = self.sm_bus.read_byte_data(self.dev_addr, addr+1)
            lo_byte = self.sm_bus.read_byte_data(self.dev_addr, addr)
            full_byte = hi_byte << 8 | lo_byte

            signed_value = -(full_byte & 0x8000) | (full_byte & 0x7FFF)

            # 外れ値を除去。外れ値の発生原因は未調査
            if self._is_outlier(axis, signed_value):
                return self._prev_value[axis]

            self._prev_value[axis] = signed_value
            return signed_value
        except IOError:
            # i2cバスのケーブルが抜けるとIOErrorが発生するので、
            # その場合は前回の値を出力する
            return self._prev_value[axis]


    def get_value_x(self):
        return self._get_value('x')

    def get_value_y(self):
        return self._get_value('y')

    def get_value_z(self):
        return self._get_value('z')

    def get_value(self, axis):
        if axis in self._axes:
            return self._get_value(axis)
        else:
            return 0

    def _is_outlier(self, axis, current_value):
        diff = math.fabs(self._prev_value[axis] - current_value)

        # 値がしきい値より大きく変動したら外れ値とみなす。
        # 外れ値の発生原因は未調査
        if diff > 200:
            return True
        else:
            return False



if __name__ == '__main__':
    adxl = ADXL345()
    adxl.setUp()

    for i in range(1000):
        x = adxl.get_value_x()
        y = adxl.get_value_y()
        z = adxl.get_value_z()

        os.system("clear")
        print("X=", x)
        print("Y=", y)
        print("Z=", z)
        time.sleep(0.1)

センサ値のフィルタリング

センサの出力値にはノイズが含まれているのでフィルタリングします。

ここでは移動平均フィルタを実装しました。 GitHub

移動平均フィルタを選択した理由は、実装が簡単だからです。欠点としては、遅れが発生します。

ロボットに加速度センサを搭載する場合は、精度および処理速度が必要になるため、カルマンフィルタなどを実装すると思います。

今回は俊敏に応答するロボットを想定していないので、多少の遅れは無視します。

状況に合わせて、必要なスペックのフィルタを実装してください。

average_filter.py

#!/usr/bin/env python
#encoding: utf8

import numpy as np
import math

# 移動平均フィルタ
# 平均値を計算するための個数は引数sizeで指定する
class AverageFilter():
    def __init__(self, size):
        if size <= 0:
            size = 1
        self._buffer = np.zeros(size, dtype=np.float32)
        self._buffer_size = size
        self._current_index = 0
        self._filtered_value = 0
        self._offset_value = 0

    def update(self, value):
        self._buffer[self._current_index] = value

        self._current_index += 1
        if self._current_index >= self._buffer_size:
            self._current_index = 0

        self._filtered_value = np.average(self._buffer)

    def get_value(self):
        return self._filtered_value - self._offset_value

    def offset(self):
        self._offset_value = self._filtered_value

速度コマンドの生成と送信

raspimouseはTwist型のcmd_velに従って走行します。

cmd_vel.linear.x が前後方向の速度、cmd_vel.angular.zが回転方向の速度です。

cmd_velの値は 加速度センサ値 × ゲインで生成しています。

しかし、このままではセンサ値が急激に変化するとcmd_velも急激に変動します。 そのため、一定の加速度で速度コマンドが変化するようにAccelConverterが計算しています。

また、ロボットが安定して停止するように、加速度センサ値にしきいを設けています。 これにより、加速度センサ値がぴったり0にならなくてもロボットが停止します。

ここでは加速度センサ値Yをcmd_vel.linear.xの計算に、 加速度センサ値Xをcmd_vel.angular.zの計算に使っています。逆にしても問題ありません。

また、Senderで設定しているパラメータは自由に変更可能です。 動かしながら調整してみてください。

controller.py

#!/usr/bin/env python
#encoding: utf8

import rospy
import math
import RPi.GPIO as GPIO

from std_msgs.msg import Float32
from geometry_msgs.msg import Twist
from std_srvs.srv import Trigger, TriggerResponse

from adxl345 import ADXL345
from average_filter import AverageFilter

class AccelConverter():
    def __init__(self, acc, gain, max_vel, halt_acc, halt_vel):
        self._target_vel = 0        # 速度出力値
        self._ACC = acc             # 加速度
        self._VEL_GAIN = gain       # 加速度センサの値を目標速度に変換するゲイン
        self._MAX_VEL = max_vel     # 最大出力速度
        self._HALT_ACC = halt_acc   # 入力が0になったと見なす加速度センサしきい値
        self._HALT_VEL = halt_vel   # 速度が0になったと見なす速度しきい値

    def convert(self, accel):
        velocity = 0
        if math.fabs(accel) > self._HALT_ACC:
            velocity = accel * self._VEL_GAIN
            
            # velocity limitter
            if math.fabs(velocity) > self._MAX_VEL:
                velocity = math.copysign(self._MAX_VEL, velocity)

            self._accelerate(velocity)
        else:
            self._accelerate(velocity)

            if math.fabs(self._target_vel) < self._HALT_VEL:
                self._target_vel = 0

    def _accelerate(self, velocity):
        if velocity > self._target_vel:
            self._target_vel += self._ACC
        elif velocity < self._target_vel:
            self._target_vel -= self._ACC

    def reset(self):
        self._target_vel = 0

    def target_velocity(self):
        return self._target_vel



class Sender():
    def __init__(self):
        self._cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=10)

        self._converter_forward = AccelConverter(
                acc = 0.01,         # unit: m/s^2
                gain = 1.0/100.0,   
                max_vel = 2.0,      # unit: m/s
                halt_acc = 50,      # accel sensor value, unit: rad/s^2
                halt_vel = 0.02     # cmd_vel value, unit: m/s
                )
        self._converter_angular = AccelConverter(
                acc = 0.05,         # unit: rad/s^2
                gain = 1.0/100.0,
                max_vel = 2.0,      # unit: rad/s
                halt_acc = 50,      # accel sensor value, unit: rad/s^2
                halt_vel = 0.02     # cmd_vel value, unit: rad/s
                )

    def convert_accel(self, accel_x, accel_y):
        self._converter_forward.convert(-accel_y)
        self._converter_angular.convert(-accel_x)

    def halt(self):
        self._converter_forward.reset()
        self._converter_angular.reset()

    def send(self):
        twist = Twist()
        twist.linear.x = self._converter_forward.target_velocity()
        twist.angular.z = self._converter_angular.target_velocity()
        self._cmd_vel_pub.publish(twist)

スイッチとROSノード

raspimouseはServiceの/motor_onを受け取らなければモータを回すことができません。

そのため、raspi_homeのタクトスイッチが押されたら/motor_onをcallするように記載しています。

/motor_off用のスイッチも設けています。

/motor_onをcallするところでは、平均値フィルタにオフセットをかけています。 オフセットにより、スイッチを押した時の角度が基準(0 rad)になるので、コントローラとして扱いやすくなります。

def main():
    # GPIO Settings
    PIN_RESTART = 4 # GPIO PIN for motor_on and filter offset
    PIN_STOP = 17   # GPIO PIN for motor_off
    SW_ON = GPIO.LOW # This value assignd by circuit logic
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(PIN_RESTART, GPIO.IN)
    GPIO.setup(PIN_STOP, GPIO.IN)

    # Module Settings
    sender = Sender()
    adxl = ADXL345()
    adxl.setUp()

    avr_filter = {}
    avr_filter['x'] = AverageFilter(10)
    avr_filter['y'] = AverageFilter(10)
    avr_filter['z'] = AverageFilter(10)

    # Publisher Settings
    accel_pubs = {}
    accel_pubs['x'] = rospy.Publisher('accel_x', Float32, queue_size=10)
    accel_pubs['y'] = rospy.Publisher('accel_y', Float32, queue_size=10)
    accel_pubs['z'] = rospy.Publisher('accel_z', Float32, queue_size=10)

    raw_accel_pubs = {}
    raw_accel_pubs['x'] = rospy.Publisher('raw_accel_x', Float32, queue_size=10)
    raw_accel_pubs['y'] = rospy.Publisher('raw_accel_y', Float32, queue_size=10)
    raw_accel_pubs['z'] = rospy.Publisher('raw_accel_z', Float32, queue_size=10)

    # Service Settings
    motor_on = rospy.ServiceProxy('/motor_on', Trigger)
    motor_off = rospy.ServiceProxy('/motor_off', Trigger)

    # Paramter Settings
    axes = ['x', 'y', 'z']
    accel = {'x':0, 'y':0, 'z':0}

    rospy.init_node('pimouse_controller')
    r = rospy.Rate(60)

    while not rospy.is_shutdown():
        # restart
        if GPIO.input(PIN_RESTART) == SW_ON:
            # motor_on
            result = motor_on()
            if result.success:
                rospy.logdebug('motor restart')
            else:
                rospy.logerr('can not restart motor')

            # offset accel
            for axis in axes:
                avr_filter[axis].offset()

        # stop
        if GPIO.input(PIN_STOP) == SW_ON:
            # motor_off
            result = motor_off()
            if result.success:
                rospy.logdebug('motor stop')
            else:
                rospy.logerr('can not stop motor')

            # halt command
            sender.halt()

        # receive value
        for axis in axes:
            raw_value = adxl.get_value(axis)

            avr_filter[axis].update(raw_value)
            accel[axis] = avr_filter[axis].get_value()

            accel_pubs[axis].publish(accel[axis])
            raw_accel_pubs[axis].publish(raw_value)

        # send command
        sender.convert_accel(accel['x'], accel['y'])
        sender.send()
        r.sleep()


if __name__ == '__main__':

    try:
        main()
    except rospy.ROSInterruptException: pass

加速度センサの動作確認

adxl345.pyを実行してセンサ値の出力を確認します。

$ python2 adxl345.py

# 出力例
('X=', 3)
('Y=', 0)
('Z=', 0)

確認できたらノードを動かしてみましょう。

pimouse_controllerはラズパイマウスのROS Packageも使用するため、cloneしておいてください。GitHub

raspi_homeにsshで接続する端末を3つ用意し、 ひとつは$ roscore、ひとつは$ rosrun pimouse_controller controller.py、ひとつは$rostopic listを実行し、 TopicがPublishされていることを確認してください。

$ rostopic list

# 出力例
/accel_x
/accel_y
/accel_z
/cmd_vel
/raw_accel_x
/raw_accel_y
/raw_accel_z
/rosout
/rosout_agg

masterpc、raspi_home、raspimouseそれぞれを通信させる

ここまで用意できれば後はmasterpc、raspi_home、raspimouseがそれぞれ通信できれば完了です。

通信の確認として、先ほどのTopicをmasterpcとraspimouseで受信してみます。

まずはそれぞれのIPアドレスを確認しましょう。 $ ip addr で確認できます。

筆者の環境では

  • masterpc : 192.168.11.7
  • raspi_home : 192.168.11.8
  • raspimouse : 192.168.11.10

となっています。

IPアドレスはデバイスを起動するたびに変わる場合が有ります。 ルータでデバイスごとにアドレスを固定しておくと便利です。

続いてROSの環境変数ROS_MASTER_URIを設定します。

ROS_MASTER_URIはrosmasterを立ち上げているデバイスがどれなのかを表します。

masterpc、raspi_home、raspimouseそれぞれの.bashrc(筆者の環境では.zshrc)に以下の行を追加してください。

# raspi_homeをROS_MASTERにする
# IPアドレス部分は環境に合わせて変更してください。
export ROS_MASTER_URI=http://192.168.11.8:11311

それぞれに書けたら、$ source ~/.bashrc を実行して読み込みます。

では、raspi_homeでroscoreとcontroller.pyを起動します。

その後、masterpcとraspimouseで$ rostopic list を実行してください。

先ほどと同じようにTopicが表示されたらOKです。

試しに、$ rostopic echo /cmd_vel を実行してみましょう。 加速度センサを傾けて、cmd_velの値が変化すればOKです。

注意点として、masterpcとraspimouseのROS_MASTER_URIはraspi_homeのIPアドレスになっています。 そのため、masterpcもしくはraspimouseがroscoreを起動するとエラーとなります。

その場合は、ROS_MASTER_URIを自身のIPアドレスに設定しましょう。

ラズパイマウスを動かす

最後にラズパイマウスを動かします。

まずはraspi_homeで$ roscoreと、$ rosrun pimouse_controller controller.pyを実行します。

その後、raspimouseで$ rosrun pimouse_ros motor.pyを実行します。

不安な人は、ラズパイマウスのモータを浮かしておきましょう。(筆者は食器用スポンジの上に置いてます)

f:id:macakasit:20180603210533j:plain
図.ラズパイonスポンジ

モータの回転を確認したら床の上を走らせてみましょう。

冒頭の動画のように動いたらOKです。

参考ページ

ja/ROS/Tutorials/MultipleMachines - ROS Wiki

How to Use the ADXL345 on Raspberry Pi

Fritzingでの部品の作り方のコツ(部品データ付)

RaspberryPiではじめてのI2C通信〜温度計測編〜