首先导入一些编写SDK的一些必要的库
因为是无线的连接所以要加一个网络的线程库,用来连接TT
接着导入时间库,因为会有一些资源的监控函数
导入cv库,这里是cv2,因为底层是C++的实现
最后一个是另外一个资源的监控类
因为是打包SDK的指令,所以就写一个类就好
这里是已经封装好的一些指令
这里有的计算机是端口打开失败,这里的电脑是win10
这个是版本号
点击高级设置
新建一个入站的规则
选择端口
因为TT的通信方式都是UDP,所以这里UDP
允许所有,其实就8889就可以
都打开
写一些info,日后好看好改
重新打开可以设置更多的东西
def __init__(self, tello_ip: str = '192.168.10.2', debug: bool = True):
# 在初始化的函数里面就是传入一个ip以及是否要打开调试功能
# 打开本地的UDP端口在8889为了和Tello通信
self.local_ip = ''
self.local_port = 8889
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 开启新的插口,我这里是喜欢这么叫
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 通常使用这个设置来加强网络程序的健壮性,后面的参数是断开重新绑定时间,Win系统最多可以留5分钟
self.socket.bind((self.local_ip, self.local_port))
# bind的意义是将已经建立的“空白的”socket绑定在网络上让大家在需要时能够找得到
# 设置Tello的ip和端口信息
self.tello_ip = tello_ip
# 在类定义里面定义Tellode IP
self.tello_port = 8889
# tello的端口是8889
self.tello_address = (self.tello_ip, self.tello_port)
# 在将ip和端口打包成一个合成的地址
self.log = [] # 对于日志的记录是写用一个空列表来放
# 初始化接收线程
self.receive_thread = threading.Thread(target=self._receive_thread)
# 这个是新建立一个线程的时候要执行的语句,target的参数是最重要的,要给一个callback对象才可以运行
# 后面的参数是一个函数,在后面有写
self.receive_thread.daemon = True
# 子线程daemon默认跟随主线程
# 设置daemon=True会标记其为守护线程,如果剩下的线程只有守护线程时,整个python程序都会退出
self.receive_thread.start()
# 开启线程
# Tello 运行时选项
self.stream_state = False
# 检查视频流的情况
self.last_frame = None
# 最后一帧
self.MAX_TIME_OUT = 15.0
# 最长的超时时间
self.debug = debug
# 将Tello设置为命令模式
self.command()
首先写一个初始化的函数,我每一个变量都写了注释
日志功能的设计很简单,日志就是记录用的。
所以就是一个list,相关方法可用
发送命令的函数,比较健壮
是元组,就是sendto函数的地址参数
以及命令就是一个字符串
这个回复函数在另外一个文件里面,判断response是不是空
没错,就是判空
接着是线程函数,要不停的运行
这个是开关
这个函数,是我写整个SDK的初衷
官方的视频接口,我能力不够。一些操作来不了
def send_command(self, command: str, query: bool = False):
# 后面的参数是查询,前面是一个字符串,发送给TT用
# 为发出的命令创建的新日志
self.log.append(Stats(command, len(self.log)))
# 发送命令给TT
self.socket.sendto(command.encode('utf-8'), self.tello_address)
# 将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。
# 返回值是发送的字节数,函数主要用于UDP协议。
# 显示确认信息 (如果debug开)
if self.debug is True:
print('Sending command: {}'.format(command))
# 用Python的字符串方法打印
# 这两个语句就是日志
# 检查命令是否超时(基于“MAX_out_TINE”中的值)
start = time.time()
# 在日志中未接收到repsonse时运行
while not self.log[-1].got_response():
# 这got_response是以一个函数,在另外一个命令里面
now = time.time()
# 现在的时间
difference = now - start
# 计算完上面的函数,就是从log里面读取最后一个元素。是一个bool回复
if difference > self.MAX_TIME_OUT:
# 如果超时,
print('Connection timed out!')
# 打印连接超时
break
# 接着出去
# 打印Tello响应(如果“debug”为真)
if self.debug is True and query is False:
print('Response: {}'.format(self.log[-1].get_response()))
def _receive_thread(self):
# 记得我上面那个多线程函数的参数吗?,就是这个函数,注意写法。带下划线,就就不想用户用
# 回调函数把
while True:
# 死循环检查Tello响应,抛出套接字错误(就是一直要保持连接)
try:
# 这里是错误处理
self.response, ip = self.socket.recvfrom(1024)
# recvfrom的功能,以接收从所述设备发送数据的数据和源地址。
# 和参数response,ip相配合
# 返回值是一对(字符串,地址),其中字符串是表示接收数据的字符串,
# 地址是发送数据的套接字的地址。
self.log[-1].add_response(self.response)
# 接着把这个回复再添加到这个log里面
except socket.error as exc:
print('Socket error: {}'.format(exc))
# 最后是抛出一个错误,打印一下
def _video_thread(self):
# 啊啊啊啊啊,我写这么多的程序就是为了这个接口,嘤嘤嘤
# cv2.VideoCapture这个接口我一定要写出来
cap = cv2.VideoCapture('udp://'+self.tello_ip+':11111')
# 在“stream state”为真时运行
while self.stream_state:
# 因为是真,下面的语句执行
ret, self.last_frame = cap.read()
# 对于参数ret 为True 或者False,代表有没有读取到图片
# 第二个参数frame表示截取到一帧的图片
cv2.imshow('DJI Tello', self.last_frame)
# 接着是打印这个照片
# 如果按escape键,视频流将关闭
k = cv2.waitKey(1) & 0xFF
if k == 27:
break
cap.release()
cv2.destroyAllWindows()
此时是两个函数的代码
这个是延时得函数
这个是打印日志列表
关闭得函数,就是把端口得资源释放。比较简单得实现
这个函数,记得初始化完毕就调用
分别是起飞和降落,以及最后得下视得摄像头得开启要用最新的测试的固件来打开