testm_bak.py
import sys
import time
import os
import traceback
import logging
from pathlib import Path
os.path.join
from zipfile import ZipFile
sys.path.append(str(Path(file).parent.resolve()))
import shutil
sys.path.append(str(Path(file).parent.parent.resolve()))
from selenium.common.exceptions import StaleElementReferenceException, SessionNotCreatedException
import pytest
import requests
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import StaleElementReferenceException, SessionNotCreatedException, TimeoutException
from selenium.webdriver.chrome.options import Options
from utils.sim_selenium_tool import ChromeDriverUtil, SeleniumTool
from Data_Process import writelog
dtf = time.strftime("%y%m%d", time.localtime())
dtfs = time.strftime("%y%m%d_%H%M%S", time.localtime())
mkpathlog = r"C:\Users\Public\CashFlowAutomationlog"
logfile = “CFAlog” + dtf + “.log”
logfilepath = os.path.join(mkpathlog, logfile)
dtf = time.strftime("%Y%m%d", time.localtime())
def mainforCFA(filename,mkph):
print(‘src.demo.testm_bak:’)
def main(cht, bu, frequency, mkpath):
try:
# download_dir = Path(__file__).parent.resolve()
download_dir = mkph
print('download_dir:')
print(download_dir)
selenium_tool = SeleniumTool(is_headless=False, download_dir=download_dir)
# self.test_sharepoint_url = "https://aiacom.sharepoint.com/sites/TSS-BI-Tableau-Support/Shared%20Documents/Forms/AllItems.aspx?csf=1&web=1&e=4zNvYC&cid=885dcf86%2Db08d%2D4ad6%2D8f10%2D8d322ef9ce15&FolderCTID=0x012000557087049A2FB14F9C3A4BF090D3A0FC&id=%2Fsites%2FTSS%2DBI%2DTableau%2DSupport%2FShared%20Documents%2FTeam%20Projects%2F09%2E%20SG%2DInvestment%2DTableau%5FSupport%2Ftest&viewid=e3d5f700%2D2e67%2D424a%2Dbac5%2Db36a5cf41afb"
# test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120008D2C2ED87CD8334E81FDAF61D2A52AE6&id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
# https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b
# test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
# test_sharepoint_url = "https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b"
# download_file_name = cht+'_'+bu+'.txt'
# download_file_name = 'fork.txt'
# test_sharepoint_url = "https://aiacom.sharepoint.com/sites/test472/Shared%20Documents/Forms/AllItems.aspx?RootFolder=%2Fsites%2Ftest472%2FShared%20Documents%2FGeneral&FolderCTID=0x0120003238C1D173B1344188021EBA14463C51"
with open(r"C:\Users\Public\pythonAutomationConfig\downloadConfig.txt", "r", encoding='utf-8') as f:
test_sharepoint_url = f.readline()
# test_sharepoint_url
# test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
download_file_name = filename
print(f"test_sharepoint_url: {test_sharepoint_url}")
print("download_file_name:"+download_file_name)
# download_file_name = 'Philippines_2021-10-21.txt'
selenium_tool.load_url(test_sharepoint_url)
# validate_account_xpath = '//input[@type="email"]'
# elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
# print('elements:')
# print(elements)
with open(r"C:\Users\Public\pythonAutomationConfig\mailConfig.txt", "r", encoding='utf-8') as f:
mailname = f.readline()
print('mailname:')
print(mailname)
# if len(elements) > 0:
# selenium_tool.input_text_element(text_str=mailname, xpath_str=validate_account_xpath)
# next_button_xpath = '//input[@type="submit"]'
# selenium_tool.query_click_button(xpath_str=next_button_xpath)
checkbox_list_xpath = "//div[@role='gridcell']"
elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
break_flag = False
actually_download_list = []
for e in elements:
if break_flag:
break
for sub_e in e.find_elements_by_tag_name("div"):
try:
class_name = sub_e.get_attribute('class')
except:
continue
condition = 'ms-DetailsRow-check' in class_name
if (condition) and (not sub_e.get_attribute('aria-label') is None) and (download_file_name in sub_e.get_attribute('aria-label')):
sub_e.click()
actually_download_list.append(sub_e.get_attribute('aria-label'))
break_flag = True
time.sleep(0.2)
break
if len(actually_download_list) > 0:
download_file = Path(selenium_tool.download_dir).joinpath(download_file_name).resolve()
# download_file = r"C:/wz/pythonFile/1.txt"
print(f"download_file: {download_file}")
print(f"selenium_tool.download_dir: {selenium_tool.download_dir}")
# xpath_str = "//li[@role='presentation']//button[@name='Download' and @data-automationid='downloadCommand']"
# _ = selenium_tool.query_click_button(xpath_str=xpath_str)
if download_file.exists():
download_file.unlink()
# xpath_str = "//li[@role='presentation']//button[@name='Download' and @data-automationid='downloadCommand']"
# _ = selenium_tool.query_click_button(xpath_str=xpath_str)
#data-automationid="expandCollapseList"
xpath_str = '//button[@data-automationid="expandCollapseList"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
xpath_str = '//button[@data-automationid="downloadCommand"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
selenium_tool.is_file_downloaded(file_path=download_file)
except Exception:
e_info = traceback.format_exc()
print(e_info)
print('exception')
# logcontent = dtfs + " " + "121" + '\n'
writelog(logfilepath,e_info)
finally:
selenium_tool.force_quit_driver()
def upload_fileforCFA(finaoutputfile):
# is_headless=False
selenium_tool = SeleniumTool(is_headless=False)
# test_sharepoint_url= “https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”
# test_sharepoint_url = “https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2FUploadFile&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b”
with open(r"C:\Users\Public\pythonAutomationConfig\uploadConfig.txt", “r”, encoding=‘utf-8’) as f:
test_sharepoint_url = f.readline()
selenium_tool.load_url(test_sharepoint_url)
# validate_account_xpath = ‘//input[@type=“email”]’
# elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
# with open(r"C:\Users\Public\pythonAutomationConfig\mailConfig.txt", “r”, encoding=‘utf-8’) as f:
# mailname = f.readline()
# if len(elements) > 0:
# # selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
# selenium_tool.input_text_element(text_str=mailname, xpath_str=validate_account_xpath)
# next_button_xpath = ‘//input[@type=“submit”]’
# selenium_tool.query_click_button(xpath_str=next_button_xpath)
# upload_file_name = r'C:\wz\pythonFile\3333.xlsx'
up_file_path = r'C:\Users\Public\AdjustmentFile'
file_name = finaoutputfile
upload_file_name = os.path.join(up_file_path,file_name)
print('upload_file_name: '+upload_file_name)
upload_button_xpath = "//button[@name='Upload']"
upload_file_button_xpath = "//button[@name='Files']"
local_filepath = Path(__file__).parent.joinpath(upload_file_name).resolve()
time.sleep(5)
selenium_tool.query_click_button(xpath_str=upload_button_xpath)
selenium_tool.query_click_button(xpath_str=upload_file_button_xpath)
selenium_tool.windows_utils.input_local_filepath_for_native_window_win32gui(filepath=str(local_filepath))
time.sleep(0.2)
replace_button_xpath = '//div[@role="alert"]//button[@name="Replace"]'
try:
selenium_tool.query_click_button(xpath_str=replace_button_xpath, explicit_wait_timeout=5)
except TimeoutException:
selenium_tool.force_quit_driver()
print('timeout')
time.sleep(0.1)
file_name = file_name.split('\\')[-1]
print(f"file_name {file_name}")
try:
selenium_tool.is_file_uploaded(filename=file_name)
except Exception:
e_info = traceback.format_exc()
print(f"{file_name} == {e_info}")
writelog(logfilepath, e_info)
finally:
selenium_tool.force_quit_driver()
print(f"upload complete {file_name}")
def main(filename,mkph):
print(‘src.demo.testm_bak:’)
def main(cht, bu, frequency, mkpath):
try:
# download_dir = Path(__file__).parent.resolve()
download_dir = mkph
print('download_dir:')
print(download_dir)
selenium_tool = SeleniumTool(is_headless=False, download_dir=download_dir)
# self.test_sharepoint_url = "https://aiacom.sharepoint.com/sites/TSS-BI-Tableau-Support/Shared%20Documents/Forms/AllItems.aspx?csf=1&web=1&e=4zNvYC&cid=885dcf86%2Db08d%2D4ad6%2D8f10%2D8d322ef9ce15&FolderCTID=0x012000557087049A2FB14F9C3A4BF090D3A0FC&id=%2Fsites%2FTSS%2DBI%2DTableau%2DSupport%2FShared%20Documents%2FTeam%20Projects%2F09%2E%20SG%2DInvestment%2DTableau%5FSupport%2Ftest&viewid=e3d5f700%2D2e67%2D424a%2Dbac5%2Db36a5cf41afb"
# test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120008D2C2ED87CD8334E81FDAF61D2A52AE6&id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
# https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b
# test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
# test_sharepoint_url = "https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b"
# download_file_name = cht+'_'+bu+'.txt'
# download_file_name = 'fork.txt'
# test_sharepoint_url = "https://aiacom.sharepoint.com/sites/test472/Shared%20Documents/Forms/AllItems.aspx?RootFolder=%2Fsites%2Ftest472%2FShared%20Documents%2FGeneral&FolderCTID=0x0120003238C1D173B1344188021EBA14463C51"
with open(r"C:\Users\Public\pythonAutomationConfig\downloadConfig.txt", "r", encoding='utf-8') as f:
test_sharepoint_url = f.readline()
# test_sharepoint_url
# test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
download_file_name = filename
print(f"test_sharepoint_url: {test_sharepoint_url}")
print("download_file_name:"+download_file_name)
# download_file_name = 'Philippines_2021-10-21.txt'
selenium_tool.load_url(test_sharepoint_url)
validate_account_xpath = '//input[@type="email"]'
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
print('elements:')
print(elements)
with open(r"C:\Users\Public\pythonAutomationConfig\mailConfig.txt", "r", encoding='utf-8') as f:
mailname = f.readline()
print('mailname:')
print(mailname)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=mailname, xpath_str=validate_account_xpath)
next_button_xpath = '//input[@type="submit"]'
selenium_tool.query_click_button(xpath_str=next_button_xpath)
checkbox_list_xpath = "//div[@role='gridcell']"
elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
break_flag = False
actually_download_list = []
for e in elements:
if break_flag:
break
for sub_e in e.find_elements_by_tag_name("div"):
try:
class_name = sub_e.get_attribute('class')
except:
continue
condition = 'ms-DetailsRow-check' in class_name
if (condition) and (not sub_e.get_attribute('aria-label') is None) and (download_file_name in sub_e.get_attribute('aria-label')):
sub_e.click()
actually_download_list.append(sub_e.get_attribute('aria-label'))
break_flag = True
time.sleep(0.2)
break
if len(actually_download_list) > 0:
download_file = Path(selenium_tool.download_dir).joinpath(download_file_name).resolve()
# download_file = r"C:/wz/pythonFile/1.txt"
print(f"download_file: {download_file}")
print(f"selenium_tool.download_dir: {selenium_tool.download_dir}")
# xpath_str = "//li[@role='presentation']//button[@name='Download' and @data-automationid='downloadCommand']"
# _ = selenium_tool.query_click_button(xpath_str=xpath_str)
if download_file.exists():
download_file.unlink()
# xpath_str = "//li[@role='presentation']//button[@name='Download' and @data-automationid='downloadCommand']"
# _ = selenium_tool.query_click_button(xpath_str=xpath_str)
#data-automationid="expandCollapseList"
xpath_str = '//button[@data-automationid="expandCollapseList"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
xpath_str = '//button[@data-automationid="downloadCommand"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
selenium_tool.is_file_downloaded(file_path=download_file)
except Exception:
e_info = traceback.format_exc()
print(e_info)
print('exception')
finally:
selenium_tool.force_quit_driver()
def main_bak():
def main(cht, bu, frequency, mkpath):
try:
# download_dir = Path(__file__).parent.resolve()
download_dir = r"C:\wz\pythonFile"
selenium_tool = SeleniumTool(is_headless=False, download_dir=download_dir)
# self.test_sharepoint_url = "https://aiacom.sharepoint.com/sites/TSS-BI-Tableau-Support/Shared%20Documents/Forms/AllItems.aspx?csf=1&web=1&e=4zNvYC&cid=885dcf86%2Db08d%2D4ad6%2D8f10%2D8d322ef9ce15&FolderCTID=0x012000557087049A2FB14F9C3A4BF090D3A0FC&id=%2Fsites%2FTSS%2DBI%2DTableau%2DSupport%2FShared%20Documents%2FTeam%20Projects%2F09%2E%20SG%2DInvestment%2DTableau%5FSupport%2Ftest&viewid=e3d5f700%2D2e67%2D424a%2Dbac5%2Db36a5cf41afb"
# test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120008D2C2ED87CD8334E81FDAF61D2A52AE6&id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
# download_file_name = cht+'_'+bu+'.txt'
# download_file_name = 'fork.txt'
# download_file_name = 'Philippines_2021-10-22.txt'
download_file_name = '1.txt'
print("download_file_name:"+download_file_name)
# download_file_name = 'Philippines_2021-10-21.txt'
selenium_tool.load_url(test_sharepoint_url)
validate_account_xpath = '//input[@type="email"]'
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str="Eoin-Z.Wang@aia.com", xpath_str=validate_account_xpath)
next_button_xpath = '//input[@type="submit"]'
selenium_tool.query_click_button(xpath_str=next_button_xpath)
checkbox_list_xpath = "//div[@role='gridcell']"
elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
break_flag = False
for e in elements:
if break_flag:
break
for sub_e in e.find_elements_by_tag_name("div"):
if (not sub_e.get_attribute('aria-label') is None) and (download_file_name in sub_e.get_attribute('aria-label')):
sub_e.click()
break_flag = True
time.sleep(0.3)
break
download_file = Path(selenium_tool.download_dir).joinpath(download_file_name).resolve()
# download_file = r"C:/wz/pythonFile/1.txt"
print(f"download_file: {download_file}")
print(f"selenium_tool.download_dir: {selenium_tool.download_dir}")
if download_file.exists():
download_file.unlink()
xpath_str = '//button[@data-automationid="downloadCommand"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
selenium_tool.is_file_downloaded(file_path=download_file)
except Exception:
# e_info = traceback.format_exc()
# print(e_info)
print('exception')
finally:
selenium_tool.force_quit_driver()
def download_files_batch(filename):
download_dir = Path(file).parent.resolve()
selenium_tool = SeleniumTool(is_headless=False, download_dir=download_dir)
download_file_name_list = [‘Mapping Table.xlsx’, filename]
# download_file_name_list = args
print(download_file_name_list)
# download_file_name_list = [‘1.txt’]
# selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”)
selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)
checkbox_list_xpath = "//div[@role='gridcell']"
elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
break_flag = False
if elements is None:
return
for e in elements:
for sub_e in e.find_elements_by_tag_name("div"):
print(f"sub_e: {sub_e}")
if sub_e.get_attribute('aria-label') in download_file_name_list:
try:
sub_e.click()
time.sleep(0.3)
except Exception:
pass
# download_file = Path(selenium_tool.download_dir).joinpath("OneDrive_1_10-25-2021.zip").resolve()
download_file = Path(selenium_tool.download_dir).joinpath(
"OneDrive_1_" + time.strftime('%m-%d-%Y') + ".zip").resolve()
if download_file.exists():
download_file.unlink()
xpath_str = '//button[@data-automationid="downloadCommand"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
selenium_tool.is_file_downloaded(file_path=download_file)
zip_file = ZipFile(download_file, 'r')
# target_file_path = r"C:\wz\pythonFile"
target_file_path = r"C:\Users\Public\pythonFile"
for file_name in download_file_name_list:
zip_file.extract(file_name, path=target_file_path)
def download_files_batch1(filename,mkph):
download_dir = Path(file).parent.resolve()
target_file_path = r"C:\Users\Public\pythonFile"
selenium_tool = SeleniumTool(is_headless=False, download_dir=target_file_path)
# download_file_name_list = [‘1.txt’, ‘6.txt’]
download_file_name_list = [‘Mapping Table.xlsx’, filename]
# download_file_name_list = [‘1.txt’]
selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)
checkbox_list_xpath = "//div[@role='gridcell']"
elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
break_flag = False
actually_downloaded_file_list = []
if elements is None:
return
for i, e in enumerate(elements):
for sub_e in e.find_elements_by_tag_name("div"):
print(f"i for element: {i}")
if sub_e.get_attribute('aria-label') in download_file_name_list:
try:
sub_e.click()
time.sleep(1)
actually_downloaded_file_list.append(sub_e.get_attribute('aria-label'))
except Exception:
pass
# download_file = Path(selenium_tool.download_dir).joinpath("OneDrive_1_10-25-2021.zip").resolve()
if len(actually_downloaded_file_list) == 1:
download_file = Path(selenium_tool.download_dir).joinpath(actually_downloaded_file_list[0]).resolve()
else:
download_file = Path(selenium_tool.download_dir).joinpath(
"OneDrive_1_" + time.strftime('%m-%d-%Y') + ".zip").resolve()
if download_file.exists():
download_file.unlink()
xpath_str = '//button[@data-automationid="downloadCommand"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
selenium_tool.is_file_downloaded(file_path=download_file)
download_file
if len(actually_downloaded_file_list) > 1:
zip_file = ZipFile(download_file, 'r')
for file_name in download_file_name_list:
zip_file.extract(file_name, path=target_file_path)
def download_files_batch_bak(filename):
# download_dir = Path(file).parent.resolve()
target_file_path = r"C:\Users\Public\pythonFile"
selenium_tool = SeleniumTool(is_headless=False, download_dir=target_file_path)
download_file_name_list = [‘Mapping Table.xlsx’]
# download_file_name_list = [‘1.txt’]
download_file_name_list.append(filename)
print(‘download_file_name_list:11’)
print(download_file_name_list)
#https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b
# selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”)
selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b”)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)
checkbox_list_xpath = "//div[@role='gridcell']"
elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
break_flag = False
actually_downloaded_file_list = []
if elements is None:
return
for i, e in enumerate(elements):
for sub_e in e.find_elements_by_tag_name("div"):
print(f"i for element: {i}")
if sub_e.get_attribute('aria-label') in download_file_name_list:
try:
sub_e.click()
time.sleep(1)
actually_downloaded_file_list.append(sub_e.get_attribute('aria-label'))
except Exception:
pass
# download_file = Path(selenium_tool.download_dir).joinpath("OneDrive_1_10-25-2021.zip").resolve()
if len(actually_downloaded_file_list) == 1:
download_file = Path(selenium_tool.download_dir).joinpath(actually_downloaded_file_list[0]).resolve()
else:
download_file = Path(selenium_tool.download_dir).joinpath(
"OneDrive_1_" + time.strftime('%m-%d-%Y') + ".zip").resolve()
if download_file.exists():
download_file.unlink()
xpath_str = '//button[@data-automationid="downloadCommand"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
selenium_tool.is_file_downloaded(file_path=download_file)
# download_file
if len(actually_downloaded_file_list) > 1:
zip_file = ZipFile(download_file, 'r')
for file_name in download_file_name_list:
zip_file.extract(file_name, path=target_file_path)
time.sleep(0.3)
orignalfile = r'C:\Users\Public\pythonFile\Mapping Table.xlsx'
targetfilepath = r'C:\Users\Public\pythonFile1'
shutil.move(orignalfile, targetfilepath)
time.sleep(0.1)
mkpathmpf = r"C:\Users\Public\pythonFile" + '\Mapping Table.xlsx'
if os.path.exists(mkpathmpf) == True:
os.remove(r'C:\Users\Public\pythonFile\Mapping Table.xlsx')
# zipfilename = "OneDrive_1_" + time.strftime('%m-%d-%Y') + ".zip"
# zipfilepath = r"C:\Users\Public\pythonFile"
# zipfilepathname = os.path.join(zipfilepath, zipfilename)
# os.remove(zipfilepathname)
def download_files_batch_bakforOnefile(filename):
# download_dir = Path(file).parent.resolve()
target_file_path = r"C:\Users\Public\pythonFile"
selenium_tool = SeleniumTool(is_headless=False, download_dir=target_file_path)
# download_file_name_list = [‘1.txt’, ‘6.txt’]
download_file_name_list = []
download_file_name_list.append(filename)
# selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”)
selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b”)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)
checkbox_list_xpath = "//div[@role='gridcell']"
elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
break_flag = False
actually_downloaded_file_list = []
if elements is None:
return
for i, e in enumerate(elements):
for sub_e in e.find_elements_by_tag_name("div"):
print(f"i for element: {i}")
if i > 102:
selenium_tool.force_quit_driver()
if sub_e.get_attribute('aria-label') in download_file_name_list:
try:
sub_e.click()
time.sleep(1)
actually_downloaded_file_list.append(sub_e.get_attribute('aria-label'))
except Exception:
pass
# download_file = Path(selenium_tool.download_dir).joinpath("OneDrive_1_10-25-2021.zip").resolve()
if len(actually_downloaded_file_list) == 1:
download_file = Path(selenium_tool.download_dir).joinpath(actually_downloaded_file_list[0]).resolve()
else:
download_file = Path(selenium_tool.download_dir).joinpath(
"OneDrive_1_" + time.strftime('%m-%d-%Y') + ".zip").resolve()
if download_file.exists():
download_file.unlink()
xpath_str = '//button[@data-automationid="downloadCommand"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
selenium_tool.is_file_downloaded(file_path=download_file)
# download_file
if len(actually_downloaded_file_list) > 1:
zip_file = ZipFile(download_file, 'r')
for file_name in download_file_name_list:
zip_file.extract(file_name, path=target_file_path)
def upload_file(finaoutputfile):
# is_headless=False
selenium_tool = SeleniumTool(is_headless=False)
# test_sharepoint_url= “https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”
# test_sharepoint_url = “https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2FUploadFile&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b”
with open(r"C:\Users\Public\pythonAutomationConfig\uploadConfig.txt", “r”, encoding=‘utf-8’) as f:
test_sharepoint_url = f.readline()
selenium_tool.load_url(test_sharepoint_url)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
with open(r"C:\Users\Public\pythonAutomationConfig\mailConfig.txt", “r”, encoding=‘utf-8’) as f:
mailname = f.readline()
if len(elements) > 0:
# selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
selenium_tool.input_text_element(text_str=mailname, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)
# upload_file_name = r'C:\wz\pythonFile\3333.xlsx'
up_file_path = r'C:\Users\Public\AdjustmentFile'
file_name = finaoutputfile
upload_file_name = os.path.join(up_file_path,file_name)
print('upload_file_name: '+upload_file_name)
upload_button_xpath = "//button[@name='Upload']"
upload_file_button_xpath = "//button[@name='Files']"
local_filepath = Path(__file__).parent.joinpath(upload_file_name).resolve()
selenium_tool.query_click_button(xpath_str=upload_button_xpath)
selenium_tool.query_click_button(xpath_str=upload_file_button_xpath)
selenium_tool.windows_utils.input_local_filepath_for_native_window_win32gui(filepath=str(local_filepath))
time.sleep(0.2)
replace_button_xpath = '//div[@role="alert"]//button[@name="Replace"]'
try:
selenium_tool.query_click_button(xpath_str=replace_button_xpath, explicit_wait_timeout=5)
except TimeoutException:
selenium_tool.force_quit_driver()
print('timeout')
time.sleep(0.2)
file_name = file_name.split('\\')[-1]
print(f"file_name {file_name}")
try:
selenium_tool.is_file_uploaded(filename=file_name)
except Exception:
e_info = traceback.format_exc()
print(f"{file_name} == {e_info}")
finally:
selenium_tool.force_quit_driver()
print(f"upload complete {file_name}")
def upload_file1130(finaoutputfile):
# is_headless=False
selenium_tool = SeleniumTool(is_headless=False)
# test_sharepoint_url= “https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”
test_sharepoint_url = “https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2FUploadFile&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b”
selenium_tool.load_url(test_sharepoint_url)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)
# upload_file_name = r'C:\wz\pythonFile\3333.xlsx'
up_file_path = r'C:\Users\Public\pythonFile1'
file_name = finaoutputfile
upload_file_name = os.path.join(up_file_path,file_name)
print('upload_file_name: '+upload_file_name)
upload_button_xpath = "//button[@name='Upload']"
upload_file_button_xpath = "//button[@name='Files']"
local_filepath = Path(__file__).parent.joinpath(upload_file_name).resolve()
selenium_tool.query_click_button(xpath_str=upload_button_xpath)
selenium_tool.query_click_button(xpath_str=upload_file_button_xpath)
selenium_tool.windows_utils.input_local_filepath_for_native_window_win32gui(filepath=str(local_filepath))
time.sleep(0.3)
replace_button_xpath = '//div[@role="alert"]//button[@name="Replace"]'
try:
selenium_tool.query_click_button(xpath_str=replace_button_xpath, explicit_wait_timeout=5)
except TimeoutException:
selenium_tool.force_quit_driver()
print('timeout')
time.sleep(0.2)
file_name = file_name.split('\\')[-1]
selenium_tool.is_file_uploaded(filename=file_name)
selenium_tool.force_quit_driver()
# import shutil
# # finaoutputfile
# orignalfilepath = r'C:\Users\Public\pythonFile1'
# orignalfile = os.path.join(orignalfilepath,finaoutputfile)
# targetfilepath = r'C:\wz\Moses'
# print('start to move file')
# shutil.move(orignalfile, targetfilepath)
# 最后删除文件
# time.sleep(0.5)
# print('start to remove file')
# for root, dirs, files in os.walk(r"C:\Users\Public\pythonFile"):
# for name in files:
# if name.endswith(".xlsx"): # 指定要删除的文件格式,这里是png,可以换成其他格式
# os.remove(os.path.join(root, name))
# print("Delete File: " + os.path.join(root, name))
# if name.endswith(".csv"): # 指定要删除的文件格式,这里是png,可以换成其他格式
# os.remove(os.path.join(root, name))
# print("Delete File: " + os.path.join(root, name))
# # if name.endswith(".txt"): # 指定要删除的文件格式,这里是png,可以换成其他格式
# # os.remove(os.path.join(root, name))
# # print("Delete File: " + os.path.join(root, name))
def upload_file_bak1():
# is_headless=False
selenium_tool = SeleniumTool(is_headless=False)
test_sharepoint_url= “https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”
selenium_tool.load_url(test_sharepoint_url)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)
upload_file_name = r'C:\wz\pythonFile\3333.xlsx'
upload_button_xpath = "//button[@name='Upload']"
upload_file_button_xpath = "//button[@name='Files']"
local_filepath = Path(__file__).parent.joinpath(upload_file_name).resolve()
selenium_tool.query_click_button(xpath_str=upload_button_xpath)
selenium_tool.query_click_button(xpath_str=upload_file_button_xpath)
selenium_tool.windows_utils.input_local_filepath_for_native_window_win32gui(filepath=str(local_filepath))
time.sleep(1)
replace_button_xpath = '//div[@role="alert"]//button[@name="Replace"]'
try:
selenium_tool.query_click_button(xpath_str=replace_button_xpath, explicit_wait_timeout=5)
except TimeoutException:
print('timeout')
time.sleep(1)
selenium_tool.is_file_uploaded(filename='3333.xlsx')
selenium_tool.force_quit_driver()
def upload_file_bak():
# is_headless=False
selenium_tool = SeleniumTool(is_headless=False)
test_sharepoint_url= “https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”
selenium_tool.load_url(test_sharepoint_url)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)
upload_file_name = r'C:\wz\pythonFile\3333.xlsx'
upload_button_xpath = "//button[@name='Upload']"
upload_file_button_xpath = "//button[@name='Files']"
local_filepath = Path(__file__).parent.joinpath(upload_file_name).resolve()
selenium_tool.query_click_button(xpath_str=upload_button_xpath)
selenium_tool.query_click_button(xpath_str=upload_file_button_xpath)
selenium_tool.windows_utils.input_local_filepath_for_native_window_win32gui(filepath=str(local_filepath))
time.sleep(1)
replace_button_xpath = '//div[@role="alert"]//button[@name="Replace"]'
try:
selenium_tool.query_click_button(xpath_str=replace_button_xpath, explicit_wait_timeout=5)
except TimeoutException:
print('timeout')
time.sleep(1)
selenium_tool.is_file_uploaded(filename='3333.xlsx')
selenium_tool.force_quit_driver()
if name == ‘main’:
# print(111)
# download_files_batch_bakforOnefile(‘Philippines_2021-11-24.txt’)
# download_files_batch_bak(‘1.txt’)
# download_files_batch_bak()
# upload_file()
# main(‘fork1.txt’,r’C:\wz\pythonFile’)
# main_bak()
upload_fileforCFA(‘U110_Cashflow_AIA_PH_20220321160533.csv’)
sim_selenium_tool.py
import os
import re
import sys
import time
from zipfile import ZipFile
from pathlib import Path
from datetime import datetime
from typing import List, Optional
from pathlib import Path
sys.path.append(str(Path(file).parent.resolve()))
sys.path.append(str(Path(file).parent.parent.resolve()))
import win32gui
import win32api
import warnings
import winreg
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from win32con import WM_INPUTLANGCHANGEREQUEST, WM_SETTEXT, WM_COMMAND
import win32com.client
import requests
from selenium.webdriver import Chrome, ChromeOptions
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.common.alert import Alert
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import StaleElementReferenceException, SessionNotCreatedException, TimeoutException
class DownloadTimeoutException(Exception):
pass
class UploadInputLocalPathFailException(Exception):
pass
class WindowsUtils(object):
@staticmethod
def set_english_keyboard_layout() -> bool:
hwnd = win32gui.GetForegroundWindow()
result = win32api.SendMessage(hwnd, WM_INPUTLANGCHANGEREQUEST, 0, 0x0409)
return result == 0
@classmethod
def input_local_filepath_for_native_window_win32gui(cls, filepath: str) -> None:
""" input the local filepath into a native window dialog
:param filepath:
:return:
"""
result = cls.set_english_keyboard_layout()
time_to_wait = 2 # must bigger than 1, otherwise the filepath can't input entirely
if result:
time.sleep(time_to_wait)
dialog = win32gui.FindWindow('#32770', 'Open')
combobox_ex32_field = win32gui.FindWindowEx(dialog, 0, 'ComboBoxEx32', None)
combobox_field = win32gui.FindWindowEx(combobox_ex32_field, 0, 'ComboBox', None)
edit_field = win32gui.FindWindowEx(combobox_field, 0, 'Edit', None)
button = win32gui.FindWindowEx(dialog, 0, 'Button', None)
win32gui.SendMessage(edit_field, WM_SETTEXT, None, filepath)
win32gui.SendMessage(dialog, WM_COMMAND, 1, button)
else:
raise UploadInputLocalPathFailException(filepath)
@classmethod
def input_local_filepath_for_native_window_win32com(cls, filepath: str):
result = cls.set_english_keyboard_layout()
if result:
time.sleep(1)
shell = win32com.client.Dispatch("WScript.Shell")
filepath = str(Path(filepath))
shell.Sendkeys(filepath)
shell.Sendkeys('~')
else:
raise UploadInputLocalPathFailException(filepath)
class ChromeDriverUtil(object):
def __init__(self,
web_driver_url: Optional[str] = None,
web_driver_url_endpoint: Optional[str] = None,
driver_path: Optional[str] = None,
chrome_version: Optional[str] = None):
self.web_driver_url = web_driver_url if web_driver_url is not None \
else 'https://chromedriver.storage.googleapis.com'
self.web_driver_url_endpoint = web_driver_url_endpoint if web_driver_url_endpoint is not None \
else 'chromedriver_win32.zip'
self.chrome_version = chrome_version
self.proxies = {
'http': 'http://HKPriProxy.aia.biz:10938',
'https': 'http://HKPriProxy.aia.biz:10938',
}
self.timeout = 60
self.verify = False
self.driver_filename = 'chromedriver.exe'
if driver_path is None:
raise FileNotFoundError('driver path is None')
else:
self.driver_path = driver_path
def filter_version(self, raw_error_msg: str):
reg_p = r'(?<=[Current browser version is\W])(\d*\.\d*\.\d*\.\d*)(?=[\Wwith binary path])'
reg_result = re.search(reg_p, raw_error_msg)
browser_version = reg_result.group(0)
self.chrome_version = browser_version
@staticmethod
def get_chrome_version() -> str:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Google\CHrome\BLBeacon")
chrome_version = winreg.QueryValueEx(key, 'version')[0]
return chrome_version
def update_safe(self) -> Optional[Path]:
if self.chrome_version is None:
self.chrome_version = self.get_chrome_version()
last_version = int(self.chrome_version.split('.')[-1])
result = None
for i in reversed(range(last_version+1)):
try:
r_path = self.update()
except FileNotFoundError:
tmp_list = self.chrome_version.split('.')
tmp_list[-1] = str(i)
tmp_str = '.'.join(tmp_list)
self.chrome_version = tmp_str
else:
result = r_path
break
return result
def update(self) -> Optional[Path]:
# warnings.simplefilter('ignore', InsecureRequestWarning)
zip_url = f"{self.web_driver_url}/{self.chrome_version}/{self.web_driver_url_endpoint }"
r = requests.get(zip_url, proxies=self.proxies, timeout=self.timeout, verify=self.verify)
if r.status_code == 404:
raise FileNotFoundError(f"zip_url: {zip_url} is not found")
elif r.status_code != 200:
raise ConnectionError(f'{zip_url} cannot access normally')
chrome_driver_folder_path = os.path.dirname(self.driver_path)
chrome_driver_zip_file_path = f"{chrome_driver_folder_path}\\chrome_driver.zip"
with open(chrome_driver_zip_file_path, mode='wb') as f:
f.write(r.content)
zip_file = ZipFile(chrome_driver_zip_file_path, 'r')
if os.path.exists(self.driver_path):
os.remove(self.driver_path)
zip_file.extract(self.driver_filename, path=chrome_driver_folder_path)
return Path(self.driver_path)
@staticmethod
def kill_process_in_windows(process_name: str = None):
if process_name is None:
process_name = "chromedriver.exe"
os.system("taskkill /f /im " + process_name)
class SeleniumTool(object):
LOG_FILE = Path(__file__).resolve().parent.parent\
.parent.joinpath(f"log/driver_{datetime.strftime(datetime.now(), '%m%d%Y')}.log")
def __init__(self, is_headless: bool = False, download_dir: Optional[Path] = None):
self.explicit_wait_timeout = 30
self.download_dir = download_dir
self.windows_utils = WindowsUtils
options = ChromeOptions()
if is_headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-gpu')
options.add_argument('--ignore-certificate-errors')
options.add_argument('--disable-notifications')
options.add_experimental_option("detach", True)
prefs_params = {
'download.default_directory': str(self.download_dir),
'download.prompt_for_download': False,
'profile.managed_default_content_settings.images': 2 }
options.add_experimental_option("prefs", prefs_params)
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
if not self.get_web_driver_path().exists():
ins = ChromeDriverUtil(driver_path=self.get_web_driver_path())
ins.update_safe()
try:
self.driver = Chrome(options=options,
executable_path=str(self.get_web_driver_path()))
except SessionNotCreatedException as e:
ins = ChromeDriverUtil(driver_path=self.get_web_driver_path())
ins.filter_version(raw_error_msg=e.args[0])
ins.update_safe()
self.driver = Chrome(options=options,
executable_path=self.get_web_driver_path())
# self.driver.set_window_size(1920, 1080)
js_zoom_out = "document.body.style.zoom='0.5'"
self.driver.execute_script(js_zoom_out)
self.action_chains = ActionChains(self.driver)
@staticmethod
def get_web_driver_path() -> Path:
web_driver_path = Path(__file__).resolve().parent.parent.parent
if 'linux' in sys.platform:
web_driver_path = '/usr/bin/chromedriver'
else:
web_driver_path = web_driver_path.joinpath('src/utils/chromedriver.exe').resolve()
return web_driver_path
@staticmethod
def is_file_downloaded(file_path: Path, timeout: int = 300) -> bool:
count_wait_time = 0
while not file_path.exists():
if count_wait_time == timeout:
raise DownloadTimeoutException(f'{file_path} -> timeout: {timeout} s')
time.sleep(1)
count_wait_time += 1
return file_path.exists()
def is_file_uploaded(self, filename: str, timeout: int = 7200) -> bool:
if len(filename) == 0:
raise FileNotFoundError
file_element = None
count_wait_time = 0
while file_element is None:
count_wait_time += 1
if count_wait_time == timeout:
raise TimeoutException(filename)
file_element = self.check_file_element_exist(filename=filename)
if file_element is None:
time.sleep(1)
return True
def check_file_element_exist(self, filename: str) -> Optional[WebElement]:
result = None
query_elements = self.get_all_file_elements()
for item in query_elements:
try:
if filename in item.text:
result = item
break
except StaleElementReferenceException:
continue
return result
def get_all_file_elements(self) -> List[WebElement]:
result = []
ms_list_surface_element_xpath = "//div[@role='presentation' and @class='ms-List-surface']"
try:
ms_list_surface_element = self.query_element(xpath_str=ms_list_surface_element_xpath)
if ms_list_surface_element.rect['height'] < 1:
return result
except TimeoutException:
return result
query_element_xpath = f"//div[@class='Files-content']//div[@class='ms-List']" \
f"//button[@role='link' and @data-automationid='FieldRenderer-name']"
query_elements = self.query_elements(xpath_str=query_element_xpath)
return query_elements
def load_url(self, url: str = None):
if url is None or len(url) == 0:
raise ValueError(f'url {url} is not valid')
self.driver.get(url)
self.driver.set_window_size(1920, 1080)
def refresh(self):
self.driver.refresh()
def query_elements(self, xpath_str: str, explicit_wait_timeout: int = None) -> List[WebElement]:
if explicit_wait_timeout is None:
explicit_wait_timeout = self.explicit_wait_timeout
wait = WebDriverWait(self.driver, explicit_wait_timeout)
elements = wait.until(EC.visibility_of_all_elements_located((By.XPATH, xpath_str)))
return elements
def query_element(self, xpath_str: str, explicit_wait_timeout: int = None) -> List[WebElement]:
if explicit_wait_timeout is None:
explicit_wait_timeout = self.explicit_wait_timeout
wait = WebDriverWait(self.driver, explicit_wait_timeout)
elements = wait.until(EC.visibility_of_element_located((By.XPATH, xpath_str)))
return elements
def query_click_button(self, xpath_str: str, explicit_wait_timeout: int = None) -> Optional[WebElement]:
if explicit_wait_timeout is None:
explicit_wait_timeout = self.explicit_wait_timeout
wait = WebDriverWait(self.driver, explicit_wait_timeout)
element = wait.until(EC.element_to_be_clickable((By.XPATH, xpath_str)))
# self.action_chains.move_to_element(element).click(element).perform()
element.click()
return element
def input_text_element(self, text_str: str, xpath_str: str, explicit_wait_timeout: int = None) -> WebElement:
elements = self.query_elements(xpath_str=xpath_str, explicit_wait_timeout=explicit_wait_timeout)
result = elements[0]
result.send_keys(text_str)
return result
def get_alert(self, explicit_wait_timeout: int = None) -> Alert:
if explicit_wait_timeout is None:
explicit_wait_timeout = self.explicit_wait_timeout
wait = WebDriverWait(self.driver, explicit_wait_timeout)
wait.until(EC.alert_is_present())
ale = self.driver.switch_to.alert
return ale
def __del__(self):
self.force_quit_driver()
def force_quit_driver(self):
if self.driver:
self.driver.quit()