目的:
通过添加设备号,则自动给添加的设备分配端口,启动对应的appium服务。注意:为了方便,将共用一个配置文件。
1、公共的配置文件名称:desired_caps.yaml
platformVersion: 5.1.1
platformName: Android
deviceName: oppo
appPackage: com.iBer.iBerAppV2
appActivity: com.iBer.iBerAppV2.MainActivity
#appPackage: com.android.mms
#appActivity: /com.qiku.android.mms.ui.MmsConversationListActivity
noReset: False
unicodeKeyborad: True #使用Unicode编码方式发送字符串
resetKeyborad: True #隐藏键盘
ip: 127.0.0.1
#port: 4723
#devices_list: ["7f4bd69a","57614229"] #给出需要启动的设备udid,此处启动2个真机
#devices_list: ["57614229","127.0.0.1:62001"]
devices_list: ["57614229"]
phone: ["12606666333"]
2、自动根据添加的设备分配端口,前提:检查当前分配的端口是否被占用,若已被占用则自动删除此端口的进程,重新分配此端口。文件名称:Check_port.py
# -*- coding: utf-8 -*-
'''
意义:端口的自动检测
'''
import socket
import os
import re
def check_port(host,port):
'''检测端口是否可用'''
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) #socket.SOCK_STREAM 或 SOCK_DGRAM
try:
s.connect((host,port))
s.shutdown(2) #禁止在一个socket上进行数据的接收与发送,利用shutdown函数使socket双向数据传输变为单向数据传输,
# shutdown需要一个单独的参数,该参数表示了如何关闭socket,0:表示禁止将来读,1:表示禁止将来写,2:表示禁止将来读写
#except OSError as msg:
except:
print("port %s is avaliable")%port #端口可用
return True
else:
print ("port %s alreadly be in use !!!")%port #端口已被占用
return False
def release_port(port):
print"release_port-------------杀掉已占用的端口"
'''杀掉正在执行的端口'''
cmd = "lsof -i:%s|awk 'NR==2{print $2}'" % port
pid = os.popen(cmd).read()
print "find port:%s, pid is value:%s" %(port,pid)
cmd = "kill -9 %s" % pid
print "kill port:%s,pid value:%s"%(port,pid)
os.popen(cmd).read()
# if __name__ =="__main__":
# host = "127.0.0.1"
# port = 4726
# check_port(host,port)
# release_port(port)
3、检查appium服务是否开启,根据当前连接的设备数量,自动开启对应的appium服务,文件名称为:muti_appium.py
# -*- coding: utf-8 -*-
'''
python启动多个appium服务----并发
'''
import subprocess
from time import ctime
import sys,os
import yaml
reload(sys)
sys.setdefaultencoding('utf8')
path = os.getcwd()
print path
with open(path+"/desired_caps.yaml","r") as file:
data = yaml.load(file)
devices_list = data["devices_list"]
def appium_start(host,port):
print "-----------------------------appium_start-----------------------------"
'''启动appium服务'''
#此处的port+3是为了appium服务的启动端口不跟multi_device中的端口重复
bootstrap_port = str(port+3)
print "bootstrap_port" + str(bootstrap_port)
'''# win
start /b appium -a 127.0.0.1 -p 4723 --log xxx.log --local-timezone
# mac
appium -a 127.0.0.1 -p 4723 --log xxx.log --local-timezone &'''
# cmd = "start /b appium -a"+host+" -p "+str(port)+" -bp "+str(bootstrap_port) windows上的写法
cmd = "appium -a " + host + " -p " + str(port) + " -bp " + str(bootstrap_port)
print "%s at %s"%(cmd,ctime())
print path+'/appium_log/'+str(port)+'.log'
logpath = os.getcwd()[:-5]
print logpath
subprocess.Popen(cmd,shell=True,stdout=open(logpath+'/appium_log/'+str(port)+'.log','wa'),stderr=subprocess.STDOUT)
# if __name__ == "__main__":
# host = "127.0.0.1"
# port = 4723
# appium_start(host,port)
# appium_process = [] lucky注销,释放开则是并发的启动appium服务
#
for i in range(len(devices_list)): #根据连接Android设备的数量,可更改此处的值
host = "127.0.0.1"
port = 4723+2+i
print port
appium_start(host, port)
# appium = multiprocessing.Process(target=appium_start,args=(host,port))
# appium_process.append(appium)
# if __name__ == "__main__": lucky注销
#
# for appium in appium_process:
# appium.start()
# for appium in appium_process:
# appium.join()
4、检查当前添加的设备号,自动开始执行多设备。文件名称为:muti_device.py
# -*- coding: utf-8 -*-
from appium import webdriver
from time import ctime
import yaml
import sys
from Add_Case_Gather import Run_test
import os,time
from time import sleep
#为了读取yaml中的中文,否则会报错
reload(sys)
sys.setdefaultencoding('utf8')
#获取desired_caps.yaml的存放路径
path = os.getcwd()
with open(path+"/desired_caps.yaml","r") as file:
data = yaml.load(file)
devices_list = data["devices_list"]
#开始执行设备
for i in range(len(devices_list)):
port = 4723 + 2 + i
def appium_desire(udid,port):
print "--------------------appium_desire-------------------------"
desired_caps = {}
desired_caps["platformName"] = data["platformName"]
desired_caps["platformVersion"] = data["platformVersion"]
desired_caps["deviceName"] = data["deviceName"]
desired_caps["udid"]=udid
desired_caps["appPackage"] = data["appPackage"]
desired_caps["appActivity"] = data["appActivity"]
desired_caps["noReset"] = data["noReset"]
print ("appium port:%s start run %s at %s" %(port,udid,ctime()))
print str(data['ip'])+str(port)
driver = webdriver.Remote('http://'+str(data['ip'])+':'+str(port)+'/wd/hub',desired_caps)
print driver
#调用需要执行的步骤操作方法
Run_test(driver).run_test1()
return
5、将如上的文件共同在一个文件中进行启动调用,文件名称为:Run_Test.py
# -*- coding: utf-8 -*-
'''并发的测试
主要功能:
1、检查给定的端口是否被占用,如果占用则自动释放
2、并发启动appium服务
3、并发启动device服务
'''
from Test.Common.multi_appium import appium_start
#from multi_device import appium_desire
from Test.Common.multi_device import appium_desire
from Test.Common.Check_port import *
from time import sleep
import multiprocessing
import sys
import yaml
#为了读取yaml中的中文,否则会报错
reload(sys)
sys.setdefaultencoding('utf8')
# 获取desired_caps.yam的路径
path = os.getcwd()
with open(path+"/desired_caps.yaml","r") as file:
data = yaml.load(file)
devices_list = data["devices_list"]
def start_appium_action(host,port):
print "start_appium_action------------------"
if check_port(host,port)==False:
release_port(port)
elif check_port(host,port)==True:
appium_start(host, port)
return True
else:
print("appium %s start faild!"%port)
return False
def start_devices_action(udid,port):
host = "127.0.0.1"
appium_desire(udid, port)
def appium_start_sync():
appium_process = []
for i in range(len(devices_list)):
host = "127.0.0.1"
port = 4723+2+i
appium = multiprocessing.Process(target=start_appium_action,args=(host,port))
appium_process.append(appium)
for appium in appium_process:
appium.start()
for appium in appium_process:
appium.join()
sleep(5)
def devices_start_sync():
desired_process = []
for i in range(len(devices_list)):
port = 4723+2+i
desired = multiprocessing.Process(target=start_devices_action, args=(devices_list[i], port))
desired_process.append(desired)
for desired in desired_process:
desired.start()
for desired in desired_process:
desired.join()
if __name__ == "__main__":
appium_start_sync()
devices_start_sync()
如上已完成了多设备的添加,如果需要新增多个设备,则在desired_caps.yaml中的 devices_list: ["57614229"]中新增设备号,然后运行Run_Test.py即可。
1.作者:Syw
2.本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
3.如果文中有什么错误,欢迎指出。以免更多的人被误导。