ADB自动化线刷升级安卓车机版本

ADB自动化线刷升级安卓车机版本

实现功能:

1.从FTP下载最新线刷包。

2.检测adb连接是否正常。

3.进行adb root

4.push_file()把本地升级包EP40_IMAGES_ALL.zip推送到/data/update/temp/目录下。

5.用updatecmd()升级/data/update/temp/目录下的EP40_IMAGES_ALL.zip升级包。

获取最新升级包到本地

#!/usr/bin/env python

# -*- coding: utf-8 -*-

import os

import urllib.request

import re

from ftplib import FTP

from datetime import datetime

import time

def current_time():

return datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

class FTPDownLoad:

def __init__(self): # 初始化

self.host = "10.4.12.203" # ftp主机ip

self.username = "Anonymous" # ftp用户名

self.password = "" # ftp密码

self.port = 2112 # ftp端口 (默认21)

self.ftp = FTP()

def connect(self): # 连接FTP server

self.ftp.encoding = 'GBK' # 设置编码,解决上传的文件包含中文的问题

self.ftp.set_debuglevel(0) # 调试模式

self.ftp.connect(host=self.host, port=self.port) # 连接ftp

self.ftp.login(self.username, self.password) # 登录ftp

self.ftp.set_pasv(True) # 打开被动模式

# print(self.ftp.getwelcome()) # 显示登录ftp信息

def quit(self): # 断开连接FTP server

self.ftp.quit()

def download_file(self, ftp_file_path):

self.ftp.cwd(ftp_file_path)

self.entries = list(self.ftp.mlsd())

print(self.entries)

# Only interested in directories

self.entries = [entry for entry in self.entries if entry[1]["type"] == "dir"]

# Sort by timestamp

self.entries.sort(key=lambda entry: entry[1]['modify'], reverse=True)

# Pick the first one

latest_name = self.entries[0][0]

print(latest_name)

dir_name = ftp_file_path + "/" + latest_name

self.ftp.cwd(dir_name)

self.entries = list(self.ftp.mlsd())

print(self.entries)

# Only interested in directories

entries = [entry for entry in self.entries if entry[1]["type"] == "file" and re.match(r'.*_ALL.zip', entry[0])]

print("匹配最新文件:", entries)

# Pick the first one

latest_file_name = entries[0][0]

print("最新文件名:", latest_file_name)

# EP40R项目路径

local_EP40 = os.path.join(r'/home/devops/tools/autoupdatesystem/updatePackage', latest_file_name)

# EP40项目路径

print('self.host:',self.host)

print('self.port:',self.port)

self.url_EP40 = "ftp://10.4.12.203:2112" + dir_name + "/" + latest_file_name

print(str(self.url_EP40))

urllib.request.urlretrieve(str(self.url_EP40), local_EP40)

# 程序入口

if __name__ == '__main__':

ftp_file_path = "/EP40/IHU_Release/AndroidR/UAT/" # FTP目录 遍历此目录内的文件

# ftp_file_key = ['update.zip', 'ReleaseNote.xls', 'update-mcu.*.zip'] # 以正则表达式形式表示的关键字列表,只下载此列表内的文件

FTP_Download = FTPDownLoad()

wait_time = 10

try:

print('start one loop package download process in %s' % current_time())

FTP_Download.connect()

FTP_Download.download_file(ftp_file_path)

FTP_Download.quit()

print('finish one loop package download process in %s' % current_time())

time.sleep(wait_time)

except Exception as e:

print(current_time()+': '+str(e))

time.sleep(wait_time)

升级安卓车机版本

# -*- coding: utf-8 -*-

import os

import sys

import time

from datetime import datetime

class AutoUpdateSystem:

'''

实现功能:

通过adb命令的方式将本地的update.zip升级包上传到挂载到qnx的目录下‘/qnx’,然后通过shell脚本将升级命令写入到‘/qnx/update/recovery/command’里;

通过命令reboot recovery重启进入升级模式进行升级。

'''

def adb_root(self):

self.adb_root_result = os.system('adb root')

if self.adb_root_result != 0:

print(current_time(), '----------adb root failed! ----------')

print(current_time(), '----------adb root passed! ----------')

def mount(self):

self.adb_mount = os.system('adb shell /sbin/busybox mount -t nfs -o nolock 172.16.1.69:/ /qnx')

time.sleep(3)

if self.adb_mount != 0:

print(current_time(), '----------adb mount failed! ----------')

print(current_time(), '----------adb mount passed! ----------')

def create_file(self):

self.mkdir_file = os.system('adb shell mkdir -p /qnx/update/temp')

if self.mkdir_file != 0:

print(current_time(), '----------mkdir temp failed! ----------')

print(current_time(), '----------mkdir temp passed! ----------')

def chmod_file(self):

self.change_power = os.system('adb shell chmod 777 /qnx/update/temp')

if self.change_power != 0:

print(current_time(), '----------chmod file failed! ----------')

print(current_time(), '----------chmod file passed! ----------')

def push_package_file(self, fname):

self.push_update_package = os.system('adb push {0} /qnx/update/temp/update.zip'.format(os.path.join( sys.path[0], fname)))

if self.push_update_package != 0:

print(current_time(), '----------push package file failed! ----------')

print(current_time(), '----------push package file passed! ----------')

def push_script(self):

self.rm_script = os.system('adb shell rm -rf /qnx/update/recovery/command')

self.push_script = os.system('adb push {0} /qnx/update/recovery/'.format(os.path.join(sys.path[0], 'Script', 'updateCommand.sh')))

print(os.path.join(sys.path[0], 'Script', 'updateCommand.sh'))

if self.push_script != 0:

print(current_time(), '----------push script failed! ----------')

print(current_time(), '----------push script passed! ----------')

def exec_script(self):

self.exec_script = os.system('adb shell sh /qnx/update/recovery/updateCommand.sh')

if self.exec_script != 0:

print(current_time(), '----------exec script failed! ----------')

print(current_time(), '----------exec script passed! ----------')

def exec_sync(self):

self.sync = os.system('adb shell sync')

if self.sync != 0:

print(current_time(), '----------exec sync failed! ----------')

print(current_time(), '----------exec sync passed! ----------')

def reboot_recovery(self):

self.rboot_recovery = os.system('adb shell reboot recovery')

print(current_time(), '----------reboot_recovery ----------')

def current_time():

return datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

def TestAdbConnect():

while True:

time.sleep(3)

ConnectStatus = os.system('adb shell ls')

if ConnectStatus == 0:

print('adb可正常连接.')

break

else:

print('adb连接失败,请到系统设置->版本->关闭/开启adb模式')

class E40UpdateSystem:

'''

实现功能:

1.检测adb连接是否正常。

2.进行adb root

3.push_file()把本地升级包EP40_IMAGES_ALL.zip推送到/data/update/temp/目录下。

4.用updatecmd()升级/data/update/temp/目录下的EP40_IMAGES_ALL.zip升级包。

'''

def create_file(self, file_path):

self.mkdir_file = os.system('adb shell mkdir -p {0}'.format(file_path))

if self.mkdir_file != 0:

print(current_time(), '----------mkdir temp failed! ----------')

print(current_time(), '----------mkdir temp passed! ----------')

def chmod_file(self, file_path):

self.change_power = os.system('adb shell chmod 777 {0}/*'.format(file_path))

if self.change_power != 0:

print(current_time(), '----------chmod file failed! ----------')

print(current_time(), '----------chmod file passed! ----------')

def push_file(self, fname, file_path):

self.rm_update_package = os.system('adb shell rm -rf {0}*'.format(file_path))

self.push_update_package = os.system('adb push {0} {1}'.format(os.path.join(sys.path[0], fname), file_path))

if self.push_update_package != 0:

print(current_time(), '----------push package file failed! ----------')

print(current_time(), '----------push package file passed! ----------')

def updatecmd(self, file_path):

self.update_cmd = os.system('adb shell am broadcast -a com.hezon.update.action --es "path" "{0}/EP40_IMAGES_ALL.zip"'.format(file_path))

if self.update_cmd != 0:

print(current_time(), '----------update package file failed! ----------')

print(current_time(), '----------update package file passed! ----------')

if __name__ == '__main__':

file_path = '/data/update/temp/'

print(current_time(), '----------start update system ----------')

TestAdbConnect() # 检测adb是否正常连接。

time.sleep(2)

EP31cmd = AutoUpdateSystem()

EP31cmd.adb_root() # 进行adb root

time.sleep(2)

EP40cmd = E40UpdateSystem()

EP40cmd.create_file(file_path) # 给升级包增加可读可写权限

time.sleep(2)

fname = os.path.join('updatePackage', 'EP40_IMAGES_ALL.zip')

print(fname)

if sys.argv[1] is not None and os.path.exists(sys.argv[1]):

fname = sys.argv[1]

EP40cmd.push_file(sys.argv[1], file_path) # 把updatePackage目录下的升级包推送到‘/mnt/user/0/emulated/’目录里

time.sleep(2)

EP40cmd.chmod_file(file_path)

time.sleep(2)

EP40cmd.updatecmd(file_path) # 执行升级指令,进行升级。

time.sleep(600)

print(current_time(), '----------End update system ----------')

相关文章