起点亦是终点,世间有轮回,相似又不相同。就像书里写的那样。江湖因一声“小二上酒”开始,最后也因一杯绿蚁戛然而止。生活,也是如此吧?
飞控不是一下做出来的,下面是一些重要传感器的mpy驱动代码,为飞控项目添砖加瓦。
from pyb import Pin
from time import sleep_us,ticks_us
class HC():
def __init__(self,trig='C5',echo='C4'):
self.trig = Pin(trig, Pin.OUT_PP)
self.echo = Pin(echo, Pin.IN)
def trigUp(self):
self.trig.value(1)
sleep_us(25)
self.trig.value(0)
def getlen(self):
distance = 0
self.trigUp()
while self.echo.value() == 0:
pass
ts = ticks_us() # 开始时间
while self.echo.value() == 1:
pass
te = ticks_us() # 结束时间
tc = te - ts # 回响时间(单位us)
distance = (tc * 170) / 10000 # 距离计算(单位为:cm)
return distance
超声波传感器
'''
参考:https://blog.csdn.net/qq_38721302/article/details/83095545
'''
MPU_ADDR=0X68
WHO_AM_I_VAL = MPU_ADDR
MPU_PWR_MGMT1_REG = 0x6B # 电源管理寄存器1
MPU_GYRO_CFG_REG = 0X1B # 陀螺仪配置寄存器
MPU_ACCEL_CFG_REG = 0X1C # 加速度传感器配置寄存器
MPU_SAMPLE_RATE_REG=0X19 # 设置MPU6050的采样率
MPU_CFG_REG=0X1A # 配置寄存器
MPU_INT_EN_REG=0X38
MPU_USER_CTRL_REG=0X6A
MPU_FIFO_EN_REG=0X23
MPU_INTBP_CFG_REG=0X37
MPU_DEVICE_ID_REG=0X75
MPU_GYRO_XOUTH_REG=0X43
MPU_GYRO_XOUTL_REG=0X44
MPU_GYRO_YOUTH_REG=0X45
MPU_GYRO_YOUTL_REG=0X46
MPU_GYRO_ZOUTH_REG=0X47
MPU_GYRO_ZOUTL_REG=0X48
MPU_ACCEL_XOUTH_REG=0X3B
MPU_ACCEL_XOUTL_REG=0X3C
MPU_ACCEL_YOUTH_REG=0X3D
MPU_ACCEL_YOUTL_REG=0X3E
MPU_ACCEL_ZOUTH_REG=0X3F
MPU_ACCEL_ZOUTL_REG=0X40
MPU_TEMP_OUTH_REG=0X41
MPU_TEMP_OUTL_REG=0X42
config_gyro_range = 3 # 0,±250°/S;1,±500°/S;2,±1000°/S;3,±2000°/S
config_accel_range = 0# 0,±2g;1,±4g;2,±8g;3,±16g
class MPU6050():
def __init__(self,iicbus,address=WHO_AM_I_VAL):
self._address = address
self._bus = iicbus
self.reset()
def _write_byte(self,cmd,val):
self._bus.mem_write(val,self._address,cmd)
def _read_byte(self,cmd):
buf = self._bus.mem_read(1,self._address,cmd,addr_size=8)
return int(buf[0])
def reset(self):
self._write_byte(MPU_PWR_MGMT1_REG, 0x00) # 配置电源管理寄存器 启动MPU6050
self._write_byte(MPU_GYRO_CFG_REG, config_gyro_range<<3) # 陀螺仪传感器,±2000dps
self._write_byte(MPU_ACCEL_CFG_REG, config_accel_range<<3)# 加速度传感器,±2g
self._write_byte(MPU_SAMPLE_RATE_REG, 0x07) # 采样频率 100
self._write_byte(MPU_CFG_REG, 0x06) # 设置数字低通滤波器
self._write_byte(MPU_INT_EN_REG,0X00) #关闭所有中断
self._write_byte(MPU_USER_CTRL_REG,0X00) #I2C主模式关闭
self._write_byte(MPU_FIFO_EN_REG,0X00) #关闭FIFO
self._write_byte(MPU_INTBP_CFG_REG,0X80) #INT引脚低电平有效
buf = self._read_byte(MPU_DEVICE_ID_REG)
if buf != self._address:
print("NPU6050 not found!")
else:
pass
def _read_byte(self,cmd):
buf = self._bus.mem_read(1,self._address,cmd,addr_size=8)
return int(buf[0])
def _read_u16(self,reg):
MSB = self._read_byte(reg)
LSB = self._read_byte(reg)
return (MSB<< 8) + LSB
def _read_s16(self,reg):
result = self._read_u16(reg)
if result > 32767:result -= 65536
return result
def read_Gyro_x(self):
x = self._read_s16(MPU_GYRO_XOUTH_REG)
return x
def read_Gyro_y(self):
y = self._read_s16(MPU_GYRO_YOUTH_REG)
return y
def read_Gyro_z(self):
z = self._read_s16(MPU_GYRO_ZOUTH_REG)
return z
def read_Accel_x(self):
x = self._read_s16(MPU_ACCEL_XOUTH_REG)
return x
def read_Accel_y(self):
y = self._read_s16(MPU_ACCEL_YOUTH_REG)
return y
def read_Accel_z(self):
z = self._read_s16(MPU_ACCEL_ZOUTH_REG)
return z
def read_Temp(self):
temp = self._read_s16(MPU_TEMP_OUTH_REG)
return temp
from pyb import I2C
i2c = I2C(2, I2C.MASTER)
mpu = MPU6050(i2c)
def GyroToDegree(num):
return num / 32768 * 2000
def AccelToGram(num):
return num / 32768 * 2
MPU6050
from imu import InvenSenseMPU, bytes_toint, MPUException
from imu import Vector3d
class MPU9250(InvenSenseMPU):
'''
MPU9250 constructor arguments
1. side_str 'X' or 'Y' depending on the Pyboard I2C interface being used
2. optional device_addr 0, 1 depending on the voltage applied to pin AD0 (Drotek default is 1)
if None driver will scan for a device (if one device only is on bus)
3, 4. transposition, scaling optional 3-tuples allowing for outputs to be based on vehicle
coordinates rather than those of the sensor itself. See readme.
'''
_mpu_addr = (104, 105) # addresses of MPU9250 determined by voltage on pin AD0
_mag_addr = 12 # Magnetometer address
_chip_id = 113
def __init__(self, side_str, device_addr=None, transposition=(0, 1, 2), scaling=(1, 1, 1)):
super().__init__(side_str, device_addr, transposition, scaling)
self._mag = Vector3d(transposition, scaling, self._mag_callback)
self.accel_filter_range = 0 # fast filtered response
self.gyro_filter_range = 0
self._mag_stale_count = 0 # MPU9250 count of consecutive reads where old data was returned
self.mag_correction = self._magsetup() # 16 bit, 100Hz update.Return correction factors.
self._mag_callback() # Seems neccessary to kick the mag off else 1st reading is zero (?)
def sensors(self):
'''
returns sensor objects accel, gyro and mag
'''
return self._accel, self._gyro, self._mag
# get temperature
def temperature(self):
'''
Returns the temperature in degree C.
'''
try:
self._read(self.buf2, 0x41, self.mpu_addr)
except OSError:
raise MPUException(self._I2Cerror)
return bytes_toint(self.buf2[0], self.buf2[1])/333.87 + 21 # I think
# Low pass filters
def gyro_filter_range(self):
'''
Returns the gyro and temperature sensor low pass filter cutoff frequency
Pass: 0 1 2 3 4 5 6 7
Cutoff (Hz): 250 184 92 41 20 10 5 3600
Sample rate (KHz): 8 1 1 1 1 1 1 8
'''
try:
self._read(self.buf1, 0x1A, self.mpu_addr)
res = self.buf1[0] & 7
except OSError:
raise MPUException(self._I2Cerror)
return res
def gyro_filter_range(self, filt):
'''
Sets the gyro and temperature sensor low pass filter cutoff frequency
Pass: 0 1 2 3 4 5 6 7
Cutoff (Hz): 250 184 92 41 20 10 5 3600
Sample rate (KHz): 8 1 1 1 1 1 1 8
'''
if filt in range(8):
try:
self._write(filt, 0x1A, self.mpu_addr)
except OSError:
raise MPUException(self._I2Cerror)
else:
raise ValueError('Filter coefficient must be between 0 and 7')
def accel_filter_range(self):
'''
Returns the accel low pass filter cutoff frequency
Pass: 0 1 2 3 4 5 6 7 BEWARE 7 doesn't work on device I tried.
Cutoff (Hz): 460 184 92 41 20 10 5 460
Sample rate (KHz): 1 1 1 1 1 1 1 1
'''
try:
self._read(self.buf1, 0x1D, self.mpu_addr)
res = self.buf1[0] & 7
except OSError:
raise MPUException(self._I2Cerror)
return res
def accel_filter_range(self, filt):
'''
Sets the accel low pass filter cutoff frequency
Pass: 0 1 2 3 4 5 6 7 BEWARE 7 doesn't work on device I tried.
Cutoff (Hz): 460 184 92 41 20 10 5 460
Sample rate (KHz): 1 1 1 1 1 1 1 1
'''
if filt in range(8):
try:
self._write(filt, 0x1D, self.mpu_addr)
except OSError:
raise MPUException(self._I2Cerror)
else:
raise ValueError('Filter coefficient must be between 0 and 7')
def _magsetup(self): # mode 2 100Hz continuous reads, 16 bit
'''
Magnetometer initialisation: use 16 bit continuous mode.
Mode 1 is 8Hz mode 2 is 100Hz repetition
returns correction values
'''
try:
self._write(0x0F, 0x0A, self._mag_addr) # fuse ROM access mode
self._read(self.buf3, 0x10, self._mag_addr) # Correction values
self._write(0, 0x0A, self._mag_addr) # Power down mode (AK8963 manual 6.4.6)
self._write(0x16, 0x0A, self._mag_addr) # 16 bit (0.15uT/LSB not 0.015), mode 2
except OSError:
raise MPUException(self._I2Cerror)
mag_x = (0.5*(self.buf3[0] - 128))/128 + 1
mag_y = (0.5*(self.buf3[1] - 128))/128 + 1
mag_z = (0.5*(self.buf3[2] - 128))/128 + 1
return (mag_x, mag_y, mag_z)
def mag(self):
'''
Magnetomerte object
'''
return self._mag
def _mag_callback(self):
'''
Update magnetometer Vector3d object (if data available)
'''
try: # If read fails, returns last valid data and
self._read(self.buf1, 0x02, self._mag_addr) # increments mag_stale_count
if self.buf1[0] & 1 == 0:
return self._mag # Data not ready: return last value
self._read(self.buf6, 0x03, self._mag_addr)
self._read(self.buf1, 0x09, self._mag_addr)
except OSError:
raise MPUException(self._I2Cerror)
if self.buf1[0] & 0x08 > 0: # An overflow has occurred
self._mag_stale_count += 1 # Error conditions retain last good value
return # user should check for ever increasing stale_counts
self._mag._ivector[1] = bytes_toint(self.buf6[1], self.buf6[0]) # Note axis twiddling and little endian
self._mag._ivector[0] = bytes_toint(self.buf6[3], self.buf6[2])
self._mag._ivector[2] = -bytes_toint(self.buf6[5], self.buf6[4])
scale = 0.15 # scale is 0.15uT/LSB
self._mag._vector[0] = self._mag._ivector[0]*self.mag_correction[0]*scale
self._mag._vector[1] = self._mag._ivector[1]*self.mag_correction[1]*scale
self._mag._vector[2] = self._mag._ivector[2]*self.mag_correction[2]*scale
self._mag_stale_count = 0
def mag_stale_count(self):
'''
Number of consecutive times where old data was returned
'''
return self._mag_stale_count
def get_mag_irq(self):
'''
Uncorrected values because floating point uses heap
'''
self._read(self.buf1, 0x02, self._mag_addr)
if self.buf1[0] == 1: # Data is ready
self._read(self.buf6, 0x03, self._mag_addr)
self._read(self.buf1, 0x09, self._mag_addr) # Mandatory status2 read
self._mag._ivector[1] = 0
if self.buf1[0] & 0x08 == 0: # No overflow has occurred
self._mag._ivector[1] = bytes_toint(self.buf6[1], self.buf6[0])
self._mag._ivector[0] = bytes_toint(self.buf6[3], self.buf6[2])
self._mag._ivector[2] = -bytes_toint(self.buf6[5], self.buf6[4])
MPU9250
I2C接线图
使用MPU6050 Yaw轴会不可避免的飘动,MPU9250自带磁力计,可以减少飘动。
# 控制电机函数
from pyb import Timer,Pin,ADC
import time
class Motor():
# 电机pwm初始化
def __init__(self,isInit=False):
timerMotor_1 = Timer(3, freq=50)
timerMotor_2 = Timer(4, freq=50)
self.motor1 = timerMotor_1.channel(1, Timer.PWM, pin=Pin('B4'))
self.motor2 = timerMotor_1.channel(2, Timer.PWM, pin=Pin('B5'))
self.motor3 = timerMotor_2.channel(3, Timer.PWM, pin=Pin('B8'))
self.motor4 = timerMotor_2.channel(4, Timer.PWM, pin=Pin('B9'))
self.motors = [self.motor1,self.motor2,self.motor3,self.motor4]
# self.x = ADC(Pin('X2'))
# self.btn_stop = Pin('X4',Pin.IN)
if not isInit:
for moto in self.motors:
self.MotoSet(moto)
time.sleep(1)
self.MotosPwmUpdate([0,0,0,0])
# 电机初始化 设置最高油门和最低油门
def MotoSet(self,moto):
moto.pulse_width_percent(10)
time.sleep(2)
moto.pulse_width_percent(5)
# pwm 更新函数 1
# 可以用于调试单个电机
def MotoPwmUpdate(self,n,pwm):
if pwm < 0 or pwm > 100:
return None
self.motors[n].pulse_width_percent(5 + pwm*5/100)
# pwm 更新函数 2
# 用于实际飞行
def MotosPwmUpdate(self,pwms):
for moto,pwm in zip(self.motors,pwms):
moto.pulse_width_percent(5 + pwm*5/100)
# 电机停止转动
# 用于紧急制动和测试
def MotoStop(self):
for moto in self.motors:
moto.pulse_width_percent(5)
电调控制,PWM实现
然后就是PWM的接收机太费引脚,换PPM,问题在于转换~
上面是PPM
下面是PWM
标准的PPM信号,以0.4ms的低电平为起始标识。后边以电平的上升沿的间隔时间来表达各个通道的控制量。一般排列10个上升沿后,电平保持高电平,直到重复下一个PPM信号。
PPM信号可以看做是一帧数据,它包含了8个通道的信息。每个上升沿间隔时间刚好等于PWM信号的高电平持续时间,也就1000us~2000us之间。
PPM的重复周期也为20ms,也是50hz的刷新频率。
import pyb
import micropython
micropython.alloc_emergency_exception_buf(100)
# Futaba PPM decoder
# http://diydrones.com/profiles/blogs/705844:BlogPost:38393
class Decoder():
def __init__(self, pin: str):
self.pin = pin
self.current_channel = -1
self.channels = [0] * 20 # up to 10 channels
self.timer = pyb.Timer(2, prescaler=83, period=0x3fffffff)
self.timer.counter(0)
# clear any previously set interrupt
pyb.ExtInt(pin, pyb.ExtInt.IRQ_RISING, pyb.Pin.PULL_NONE, None)
self.ext_int = pyb.ExtInt(pin, pyb.ExtInt.IRQ_RISING, pyb.Pin.PULL_NONE, self._callback)
def _callback(self, t):
ticks = self.timer.counter()
if ticks > 5000:
self.current_channel = 0
elif self.current_channel > -1:
self.channels[self.current_channel] = ticks
self.current_channel += 1
self.timer.counter(0)
def get_channel_value(self, channel: int) -> int:
return self.channels[channel]
def enable(self):
self.ext_int.enable()
def disable(self):
self.ext_int.disable()
PPM驱动代码
# 匿名上位机 移植上位机、协议版本v4.34
# 移植部分功能
# 关闭了数据的校验功能,提高发送速度
class ANO():
def __init__(self,uart):
self.writechar = uart.writechar
self.BYTE0 = lambda x : (x0)&0xff
self.BYTE1 = lambda x : (x8)&0xff
self.BYTE2 = lambda x : (x16)&0xff
self.BYTE3 = lambda x : (x24)&0xff
pass
# 发送数据
def ANO_Send_Data(self,data):
for v in data:
self.writechar(v)
#send 数据
def cs(self,data_to_send):
_cnt = len(data_to_send)
data_to_send[3] = _cnt - 4
data_to_send.append(0) # 需要校验功能的将这一行改了就行
self.ANO_Send_Data(data_to_send)
# 飞机姿态 高度 飞行模式 是否解锁
def ANO_DT_Send_Status(self,angle_rol, angle_pit, angle_yaw, alt, fly_model, armed):
_cnt = 0
_temp = 0
_temp2 = alt
data_to_send = []
data_to_send.append(0xAA)
data_to_send.append(0xAA)
data_to_send.append(0x01)
data_to_send.append(0)
_temp = int(angle_rol * 100)
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = int(angle_pit * 100)
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = int(angle_yaw * 100)
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
data_to_send.append(self.BYTE3(_temp2))
data_to_send.append(self.BYTE2(_temp2))
data_to_send.append(self.BYTE1(_temp2))
data_to_send.append(self.BYTE0(_temp2))
data_to_send.append(fly_model)
data_to_send.append(armed)
self.cs(data_to_send)
# PID信息
def ANO_DT_Send_PID(self,group, p1_p, p1_i, p1_d, p2_p, p2_i, p2_d, p3_p, p3_i, p3_d):
_cnt = 0
_temp = 0
data_to_send = []
data_to_send.append(0xAA)
data_to_send.append(0xAA)
data_to_send.append(0x10 + group -1)
data_to_send.append(0)
_temp = p1_p * 1000
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = p1_i * 1000
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = p1_d * 1000
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = p2_p * 1000
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = p2_i * 1000
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = p2_d * 1000
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = p3_p * 1000
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = p3_i * 1000
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = p3_d * 1000
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
self.cs(data_to_send)
# 传感器数据
def ANO_DT_Send_Senser(self,a_x, a_y, a_z, g_x, g_y, g_z, m_x, m_y, m_z):
_cnt = 0
_temp = 0
data_to_send = []
data_to_send.append(0xAA)
data_to_send.append(0xAA)
data_to_send.append(0x02)
data_to_send.append(0)
_temp = a_x
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = a_y
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = a_z
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = g_x
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = g_y
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = g_z
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = m_x
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = m_y
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
_temp = m_z
data_to_send.append(self.BYTE1(_temp))
data_to_send.append(self.BYTE0(_temp))
self.cs(data_to_send)
# 电机pwm数据 0-1000
def ANO_DT_Send_MotoPWM(self, m_1, m_2, m_3, m_4, m_5, m_6, m_7, m_8):
_cnt = 0
data_to_send = []
data_to_send.append(0xAA)
data_to_send.append(0xAA)
data_to_send.append(0x06)
data_to_send.append(0)
data_to_send.append(self.BYTE1(m_1))
data_to_send.append(self.BYTE0(m_1))
data_to_send.append(self.BYTE1(m_2))
data_to_send.append(self.BYTE0(m_2))
data_to_send.append(self.BYTE1(m_3))
data_to_send.append(self.BYTE0(m_3))
data_to_send.append(self.BYTE1(m_4))
data_to_send.append(self.BYTE0(m_4))
data_to_send.append(self.BYTE1(m_5))
data_to_send.append(self.BYTE0(m_5))
data_to_send.append(self.BYTE1(m_6))
data_to_send.append(self.BYTE0(m_6))
data_to_send.append(self.BYTE1(m_7))
data_to_send.append(self.BYTE0(m_7))
data_to_send.append(self.BYTE1(m_8))
data_to_send.append(self.BYTE0(m_8))
self.cs(data_to_send)
匿名上位机若干协议移植,2.4G控制
下面是个相似的项目:
惯性测量单元 (IMU)
SBUS接口(用于连接SBUS RC接收器)
就是这么简单的东西。。。
airPy代码分为3个模块:
airPy 固件:运行在 pyboard 上的 python 代码
airPy 地面站:在 PC 上运行的 JavaFx 代码,用于配置/调整 airPy 板
airPy Link Protocol:用于板卡与地面站通信的串行协议
airPy Link Protocol目前仅用于配置和调整目的。
它是一种可变大小的串行协议,用于 pyboard 和地面站之间的数据通信。
这个想法是不仅在 USB 上而且在 Wi-fi、蓝牙和 ad hoc RF 通道(例如 433MHz)上使用这个协议。
每个 APLINK 消息都包含一个标头和一个可变大小的有效负载(取决于内容),如以下架构中所指定。
参考文章:
https://github.com/micropython-IMU/micropython-mpu9x50
microPython-MPU9x50驱动
https://www.jianshu.com/u/38cc558a2c62
micropython驱动的移植者
https://blog.csdn.net/qq_38721302/article/details/83095545
STM32 驱动MPU6050(使用了本文的寄存器定义)
https://github.com/Sokrates80/air-py
特别的,有一个microPython的飞控开源项目了,减少了我的工作量。
https://github.com/Sokrates80/airPy-GS
相应搭配的地面站
https://github.com/micropython-IMU/micropython-fusion
MicroPython 的单个平台上获取传感器数据并执行融合的情况
https://github.com/wagnerc4/flight_controller
另外一个完整的项目
https://github.com/ArduPilot/ardupilot
pid参考了这里,APM飞控
http://www.air-py.com/
相关网站
开源的mpy飞控系统