0
点赞
收藏
分享

微信扫一扫

CFA_Project --- selenium tool

秀妮_5519 2022-03-22 阅读 21

testm_bak.py

import sys
import time
import os
import traceback
import logging
from pathlib import Path

os.path.join

from zipfile import ZipFile
sys.path.append(str(Path(file).parent.resolve()))
import shutil
sys.path.append(str(Path(file).parent.parent.resolve()))
from selenium.common.exceptions import StaleElementReferenceException, SessionNotCreatedException
import pytest
import requests
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import StaleElementReferenceException, SessionNotCreatedException, TimeoutException
from selenium.webdriver.chrome.options import Options
from utils.sim_selenium_tool import ChromeDriverUtil, SeleniumTool
from Data_Process import writelog

dtf = time.strftime("%y%m%d", time.localtime())
dtfs = time.strftime("%y%m%d_%H%M%S", time.localtime())
mkpathlog = r"C:\Users\Public\CashFlowAutomationlog"
logfile = “CFAlog” + dtf + “.log”
logfilepath = os.path.join(mkpathlog, logfile)
dtf = time.strftime("%Y%m%d", time.localtime())
def mainforCFA(filename,mkph):
print(‘src.demo.testm_bak:’)

def main(cht, bu, frequency, mkpath):

try:
    # download_dir = Path(__file__).parent.resolve()
    download_dir = mkph
    print('download_dir:')
    print(download_dir)
    selenium_tool = SeleniumTool(is_headless=False, download_dir=download_dir)
    # self.test_sharepoint_url = "https://aiacom.sharepoint.com/sites/TSS-BI-Tableau-Support/Shared%20Documents/Forms/AllItems.aspx?csf=1&web=1&e=4zNvYC&cid=885dcf86%2Db08d%2D4ad6%2D8f10%2D8d322ef9ce15&FolderCTID=0x012000557087049A2FB14F9C3A4BF090D3A0FC&id=%2Fsites%2FTSS%2DBI%2DTableau%2DSupport%2FShared%20Documents%2FTeam%20Projects%2F09%2E%20SG%2DInvestment%2DTableau%5FSupport%2Ftest&viewid=e3d5f700%2D2e67%2D424a%2Dbac5%2Db36a5cf41afb"
    # test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120008D2C2ED87CD8334E81FDAF61D2A52AE6&id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
    # https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b
    # test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
    # test_sharepoint_url = "https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b"
    # download_file_name = cht+'_'+bu+'.txt'
    # download_file_name = 'fork.txt'
    # test_sharepoint_url = "https://aiacom.sharepoint.com/sites/test472/Shared%20Documents/Forms/AllItems.aspx?RootFolder=%2Fsites%2Ftest472%2FShared%20Documents%2FGeneral&FolderCTID=0x0120003238C1D173B1344188021EBA14463C51"
    with open(r"C:\Users\Public\pythonAutomationConfig\downloadConfig.txt", "r", encoding='utf-8') as f:
        test_sharepoint_url = f.readline()
    # test_sharepoint_url
    # test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
    download_file_name = filename
    print(f"test_sharepoint_url: {test_sharepoint_url}")
    print("download_file_name:"+download_file_name)
    # download_file_name = 'Philippines_2021-10-21.txt'
    selenium_tool.load_url(test_sharepoint_url)
    # validate_account_xpath = '//input[@type="email"]'
    # elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
    # print('elements:')
    # print(elements)
    with open(r"C:\Users\Public\pythonAutomationConfig\mailConfig.txt", "r", encoding='utf-8') as f:
        mailname = f.readline()
        print('mailname:')
    print(mailname)
    # if len(elements) > 0:
        # selenium_tool.input_text_element(text_str=mailname, xpath_str=validate_account_xpath)
        # next_button_xpath = '//input[@type="submit"]'
        # selenium_tool.query_click_button(xpath_str=next_button_xpath)
    checkbox_list_xpath = "//div[@role='gridcell']"

    elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
    break_flag = False
    actually_download_list = []

    for e in elements:
        if break_flag:
            break
        for sub_e in e.find_elements_by_tag_name("div"):
            try:
                class_name = sub_e.get_attribute('class')
            except:
                continue
            condition = 'ms-DetailsRow-check' in class_name
            if (condition) and (not sub_e.get_attribute('aria-label') is None) and (download_file_name in sub_e.get_attribute('aria-label')):
                sub_e.click()
                actually_download_list.append(sub_e.get_attribute('aria-label'))
                break_flag = True
                time.sleep(0.2)
                break
    if len(actually_download_list) > 0:
        download_file = Path(selenium_tool.download_dir).joinpath(download_file_name).resolve()
        # download_file = r"C:/wz/pythonFile/1.txt"
        print(f"download_file: {download_file}")
        print(f"selenium_tool.download_dir: {selenium_tool.download_dir}")

        # xpath_str = "//li[@role='presentation']//button[@name='Download' and @data-automationid='downloadCommand']"
        # _ = selenium_tool.query_click_button(xpath_str=xpath_str)

        if download_file.exists():
            download_file.unlink()

        # xpath_str = "//li[@role='presentation']//button[@name='Download' and @data-automationid='downloadCommand']"
        # _ = selenium_tool.query_click_button(xpath_str=xpath_str)

        #data-automationid="expandCollapseList"

        xpath_str = '//button[@data-automationid="expandCollapseList"]'
        _ = selenium_tool.query_click_button(xpath_str=xpath_str)

        xpath_str = '//button[@data-automationid="downloadCommand"]'
        _ = selenium_tool.query_click_button(xpath_str=xpath_str)
        selenium_tool.is_file_downloaded(file_path=download_file)


except Exception:
    e_info = traceback.format_exc()
    print(e_info)
    print('exception')
    # logcontent = dtfs + " " + "121" + '\n'
    writelog(logfilepath,e_info)
finally:
    selenium_tool.force_quit_driver()

def upload_fileforCFA(finaoutputfile):
# is_headless=False
selenium_tool = SeleniumTool(is_headless=False)
# test_sharepoint_url= “https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”
# test_sharepoint_url = “https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2FUploadFile&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b”
with open(r"C:\Users\Public\pythonAutomationConfig\uploadConfig.txt", “r”, encoding=‘utf-8’) as f:
test_sharepoint_url = f.readline()
selenium_tool.load_url(test_sharepoint_url)
# validate_account_xpath = ‘//input[@type=“email”]’
# elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
# with open(r"C:\Users\Public\pythonAutomationConfig\mailConfig.txt", “r”, encoding=‘utf-8’) as f:
# mailname = f.readline()
# if len(elements) > 0:
# # selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
# selenium_tool.input_text_element(text_str=mailname, xpath_str=validate_account_xpath)
# next_button_xpath = ‘//input[@type=“submit”]’
# selenium_tool.query_click_button(xpath_str=next_button_xpath)

# upload_file_name = r'C:\wz\pythonFile\3333.xlsx'
up_file_path = r'C:\Users\Public\AdjustmentFile'
file_name = finaoutputfile
upload_file_name = os.path.join(up_file_path,file_name)
print('upload_file_name: '+upload_file_name)
upload_button_xpath = "//button[@name='Upload']"
upload_file_button_xpath = "//button[@name='Files']"
local_filepath = Path(__file__).parent.joinpath(upload_file_name).resolve()
time.sleep(5)
selenium_tool.query_click_button(xpath_str=upload_button_xpath)
selenium_tool.query_click_button(xpath_str=upload_file_button_xpath)
selenium_tool.windows_utils.input_local_filepath_for_native_window_win32gui(filepath=str(local_filepath))
time.sleep(0.2)
replace_button_xpath = '//div[@role="alert"]//button[@name="Replace"]'
try:
    selenium_tool.query_click_button(xpath_str=replace_button_xpath, explicit_wait_timeout=5)
except TimeoutException:
    selenium_tool.force_quit_driver()
    print('timeout')
time.sleep(0.1)
file_name = file_name.split('\\')[-1]
print(f"file_name {file_name}")
try:
    selenium_tool.is_file_uploaded(filename=file_name)
except Exception:
    e_info = traceback.format_exc()
    print(f"{file_name} == {e_info}")
    writelog(logfilepath, e_info)
finally:
    selenium_tool.force_quit_driver()
    print(f"upload complete {file_name}")

def main(filename,mkph):
print(‘src.demo.testm_bak:’)

def main(cht, bu, frequency, mkpath):

try:
    # download_dir = Path(__file__).parent.resolve()
    download_dir = mkph
    print('download_dir:')
    print(download_dir)
    selenium_tool = SeleniumTool(is_headless=False, download_dir=download_dir)
    # self.test_sharepoint_url = "https://aiacom.sharepoint.com/sites/TSS-BI-Tableau-Support/Shared%20Documents/Forms/AllItems.aspx?csf=1&web=1&e=4zNvYC&cid=885dcf86%2Db08d%2D4ad6%2D8f10%2D8d322ef9ce15&FolderCTID=0x012000557087049A2FB14F9C3A4BF090D3A0FC&id=%2Fsites%2FTSS%2DBI%2DTableau%2DSupport%2FShared%20Documents%2FTeam%20Projects%2F09%2E%20SG%2DInvestment%2DTableau%5FSupport%2Ftest&viewid=e3d5f700%2D2e67%2D424a%2Dbac5%2Db36a5cf41afb"
    # test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120008D2C2ED87CD8334E81FDAF61D2A52AE6&id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
    # https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b
    # test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
    # test_sharepoint_url = "https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b"
    # download_file_name = cht+'_'+bu+'.txt'
    # download_file_name = 'fork.txt'
    # test_sharepoint_url = "https://aiacom.sharepoint.com/sites/test472/Shared%20Documents/Forms/AllItems.aspx?RootFolder=%2Fsites%2Ftest472%2FShared%20Documents%2FGeneral&FolderCTID=0x0120003238C1D173B1344188021EBA14463C51"
    with open(r"C:\Users\Public\pythonAutomationConfig\downloadConfig.txt", "r", encoding='utf-8') as f:
        test_sharepoint_url = f.readline()
    # test_sharepoint_url
    # test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
    download_file_name = filename
    print(f"test_sharepoint_url: {test_sharepoint_url}")
    print("download_file_name:"+download_file_name)
    # download_file_name = 'Philippines_2021-10-21.txt'
    selenium_tool.load_url(test_sharepoint_url)
    validate_account_xpath = '//input[@type="email"]'
    elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
    print('elements:')
    print(elements)
    with open(r"C:\Users\Public\pythonAutomationConfig\mailConfig.txt", "r", encoding='utf-8') as f:
        mailname = f.readline()
        print('mailname:')
    print(mailname)
    if len(elements) > 0:
        selenium_tool.input_text_element(text_str=mailname, xpath_str=validate_account_xpath)
        next_button_xpath = '//input[@type="submit"]'
        selenium_tool.query_click_button(xpath_str=next_button_xpath)
    checkbox_list_xpath = "//div[@role='gridcell']"

    elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
    break_flag = False
    actually_download_list = []

    for e in elements:
        if break_flag:
            break
        for sub_e in e.find_elements_by_tag_name("div"):
            try:
                class_name = sub_e.get_attribute('class')
            except:
                continue
            condition = 'ms-DetailsRow-check' in class_name
            if (condition) and (not sub_e.get_attribute('aria-label') is None) and (download_file_name in sub_e.get_attribute('aria-label')):
                sub_e.click()
                actually_download_list.append(sub_e.get_attribute('aria-label'))
                break_flag = True
                time.sleep(0.2)
                break
    if len(actually_download_list) > 0:
        download_file = Path(selenium_tool.download_dir).joinpath(download_file_name).resolve()
        # download_file = r"C:/wz/pythonFile/1.txt"
        print(f"download_file: {download_file}")
        print(f"selenium_tool.download_dir: {selenium_tool.download_dir}")

        # xpath_str = "//li[@role='presentation']//button[@name='Download' and @data-automationid='downloadCommand']"
        # _ = selenium_tool.query_click_button(xpath_str=xpath_str)

        if download_file.exists():
            download_file.unlink()

        # xpath_str = "//li[@role='presentation']//button[@name='Download' and @data-automationid='downloadCommand']"
        # _ = selenium_tool.query_click_button(xpath_str=xpath_str)

        #data-automationid="expandCollapseList"

        xpath_str = '//button[@data-automationid="expandCollapseList"]'
        _ = selenium_tool.query_click_button(xpath_str=xpath_str)

        xpath_str = '//button[@data-automationid="downloadCommand"]'
        _ = selenium_tool.query_click_button(xpath_str=xpath_str)
        selenium_tool.is_file_downloaded(file_path=download_file)


except Exception:
    e_info = traceback.format_exc()
    print(e_info)
    print('exception')
finally:
    selenium_tool.force_quit_driver()

def main_bak():

def main(cht, bu, frequency, mkpath):

try:
    # download_dir = Path(__file__).parent.resolve()
    download_dir = r"C:\wz\pythonFile"
    selenium_tool = SeleniumTool(is_headless=False, download_dir=download_dir)
    # self.test_sharepoint_url = "https://aiacom.sharepoint.com/sites/TSS-BI-Tableau-Support/Shared%20Documents/Forms/AllItems.aspx?csf=1&web=1&e=4zNvYC&cid=885dcf86%2Db08d%2D4ad6%2D8f10%2D8d322ef9ce15&FolderCTID=0x012000557087049A2FB14F9C3A4BF090D3A0FC&id=%2Fsites%2FTSS%2DBI%2DTableau%2DSupport%2FShared%20Documents%2FTeam%20Projects%2F09%2E%20SG%2DInvestment%2DTableau%5FSupport%2Ftest&viewid=e3d5f700%2D2e67%2D424a%2Dbac5%2Db36a5cf41afb"
    # test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120008D2C2ED87CD8334E81FDAF61D2A52AE6&id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
    test_sharepoint_url = "https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a"
    # download_file_name = cht+'_'+bu+'.txt'
    # download_file_name = 'fork.txt'
    # download_file_name = 'Philippines_2021-10-22.txt'
    download_file_name = '1.txt'
    print("download_file_name:"+download_file_name)
    # download_file_name = 'Philippines_2021-10-21.txt'
    selenium_tool.load_url(test_sharepoint_url)

    validate_account_xpath = '//input[@type="email"]'
    elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
    if len(elements) > 0:
        selenium_tool.input_text_element(text_str="Eoin-Z.Wang@aia.com", xpath_str=validate_account_xpath)
        next_button_xpath = '//input[@type="submit"]'
        selenium_tool.query_click_button(xpath_str=next_button_xpath)

    checkbox_list_xpath = "//div[@role='gridcell']"
    elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
    break_flag = False
    for e in elements:
        if break_flag:
            break
        for sub_e in e.find_elements_by_tag_name("div"):
            if (not sub_e.get_attribute('aria-label') is None) and (download_file_name in sub_e.get_attribute('aria-label')):
                sub_e.click()
                break_flag = True
                time.sleep(0.3)
                break
    download_file = Path(selenium_tool.download_dir).joinpath(download_file_name).resolve()
    # download_file = r"C:/wz/pythonFile/1.txt"
    print(f"download_file: {download_file}")
    print(f"selenium_tool.download_dir: {selenium_tool.download_dir}")
    if download_file.exists():
        download_file.unlink()
    xpath_str = '//button[@data-automationid="downloadCommand"]'
    _ = selenium_tool.query_click_button(xpath_str=xpath_str)
    selenium_tool.is_file_downloaded(file_path=download_file)


except Exception:
    # e_info = traceback.format_exc()
    # print(e_info)
    print('exception')
finally:
    selenium_tool.force_quit_driver()

def download_files_batch(filename):
download_dir = Path(file).parent.resolve()
selenium_tool = SeleniumTool(is_headless=False, download_dir=download_dir)
download_file_name_list = [‘Mapping Table.xlsx’, filename]
# download_file_name_list = args
print(download_file_name_list)
# download_file_name_list = [‘1.txt’]
# selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”)
selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest1&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)

checkbox_list_xpath = "//div[@role='gridcell']"
elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
break_flag = False
if elements is None:
    return
for e in elements:

    for sub_e in e.find_elements_by_tag_name("div"):
        print(f"sub_e: {sub_e}")
        if sub_e.get_attribute('aria-label') in download_file_name_list:
            try:
                sub_e.click()
                time.sleep(0.3)
            except Exception:
                pass
# download_file = Path(selenium_tool.download_dir).joinpath("OneDrive_1_10-25-2021.zip").resolve()
download_file = Path(selenium_tool.download_dir).joinpath(
    "OneDrive_1_" + time.strftime('%m-%d-%Y') + ".zip").resolve()
if download_file.exists():
    download_file.unlink()
xpath_str = '//button[@data-automationid="downloadCommand"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
selenium_tool.is_file_downloaded(file_path=download_file)

zip_file = ZipFile(download_file, 'r')
# target_file_path = r"C:\wz\pythonFile"
target_file_path = r"C:\Users\Public\pythonFile"
for file_name in download_file_name_list:
    zip_file.extract(file_name, path=target_file_path)

def download_files_batch1(filename,mkph):
download_dir = Path(file).parent.resolve()
target_file_path = r"C:\Users\Public\pythonFile"
selenium_tool = SeleniumTool(is_headless=False, download_dir=target_file_path)
# download_file_name_list = [‘1.txt’, ‘6.txt’]
download_file_name_list = [‘Mapping Table.xlsx’, filename]
# download_file_name_list = [‘1.txt’]
selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)

checkbox_list_xpath = "//div[@role='gridcell']"
elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
break_flag = False
actually_downloaded_file_list = []
if elements is None:
    return
for i, e in enumerate(elements):

    for sub_e in e.find_elements_by_tag_name("div"):
        print(f"i for element: {i}")
        if sub_e.get_attribute('aria-label') in download_file_name_list:
            try:
                sub_e.click()
                time.sleep(1)
                actually_downloaded_file_list.append(sub_e.get_attribute('aria-label'))
            except Exception:
                pass
# download_file = Path(selenium_tool.download_dir).joinpath("OneDrive_1_10-25-2021.zip").resolve()

if len(actually_downloaded_file_list) == 1:
    download_file = Path(selenium_tool.download_dir).joinpath(actually_downloaded_file_list[0]).resolve()

else:
    download_file = Path(selenium_tool.download_dir).joinpath(
    "OneDrive_1_" + time.strftime('%m-%d-%Y') + ".zip").resolve()
if download_file.exists():
    download_file.unlink()
xpath_str = '//button[@data-automationid="downloadCommand"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
selenium_tool.is_file_downloaded(file_path=download_file)
download_file
if len(actually_downloaded_file_list) > 1:
    zip_file = ZipFile(download_file, 'r')

    for file_name in download_file_name_list:
        zip_file.extract(file_name, path=target_file_path)

def download_files_batch_bak(filename):
# download_dir = Path(file).parent.resolve()
target_file_path = r"C:\Users\Public\pythonFile"
selenium_tool = SeleniumTool(is_headless=False, download_dir=target_file_path)
download_file_name_list = [‘Mapping Table.xlsx’]
# download_file_name_list = [‘1.txt’]
download_file_name_list.append(filename)
print(‘download_file_name_list:11’)
print(download_file_name_list)
#https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b
# selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”)
selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b”)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)

checkbox_list_xpath = "//div[@role='gridcell']"
elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
break_flag = False
actually_downloaded_file_list = []
if elements is None:
    return
for i, e in enumerate(elements):

    for sub_e in e.find_elements_by_tag_name("div"):
        print(f"i for element: {i}")
        if sub_e.get_attribute('aria-label') in download_file_name_list:
            try:
                sub_e.click()
                time.sleep(1)
                actually_downloaded_file_list.append(sub_e.get_attribute('aria-label'))
            except Exception:
                pass
# download_file = Path(selenium_tool.download_dir).joinpath("OneDrive_1_10-25-2021.zip").resolve()

if len(actually_downloaded_file_list) == 1:
    download_file = Path(selenium_tool.download_dir).joinpath(actually_downloaded_file_list[0]).resolve()

else:
    download_file = Path(selenium_tool.download_dir).joinpath(
    "OneDrive_1_" + time.strftime('%m-%d-%Y') + ".zip").resolve()
if download_file.exists():
    download_file.unlink()
xpath_str = '//button[@data-automationid="downloadCommand"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
selenium_tool.is_file_downloaded(file_path=download_file)
# download_file
if len(actually_downloaded_file_list) > 1:
    zip_file = ZipFile(download_file, 'r')

    for file_name in download_file_name_list:
        zip_file.extract(file_name, path=target_file_path)
time.sleep(0.3)
orignalfile = r'C:\Users\Public\pythonFile\Mapping Table.xlsx'
targetfilepath = r'C:\Users\Public\pythonFile1'
shutil.move(orignalfile, targetfilepath)
time.sleep(0.1)
mkpathmpf = r"C:\Users\Public\pythonFile" + '\Mapping Table.xlsx'
if os.path.exists(mkpathmpf) == True:
    os.remove(r'C:\Users\Public\pythonFile\Mapping Table.xlsx')
# zipfilename = "OneDrive_1_" + time.strftime('%m-%d-%Y') + ".zip"
# zipfilepath = r"C:\Users\Public\pythonFile"
# zipfilepathname = os.path.join(zipfilepath, zipfilename)
# os.remove(zipfilepathname)

def download_files_batch_bakforOnefile(filename):
# download_dir = Path(file).parent.resolve()
target_file_path = r"C:\Users\Public\pythonFile"
selenium_tool = SeleniumTool(is_headless=False, download_dir=target_file_path)
# download_file_name_list = [‘1.txt’, ‘6.txt’]
download_file_name_list = []
download_file_name_list.append(filename)
# selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”)
selenium_tool.load_url(“https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2Ftest&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b”)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)

checkbox_list_xpath = "//div[@role='gridcell']"
elements = selenium_tool.query_elements(xpath_str=checkbox_list_xpath, explicit_wait_timeout=10)
break_flag = False
actually_downloaded_file_list = []
if elements is None:
    return
for i, e in enumerate(elements):

    for sub_e in e.find_elements_by_tag_name("div"):
        print(f"i for element: {i}")
        if i > 102:
            selenium_tool.force_quit_driver()
        if sub_e.get_attribute('aria-label') in download_file_name_list:
            try:
                sub_e.click()
                time.sleep(1)
                actually_downloaded_file_list.append(sub_e.get_attribute('aria-label'))
            except Exception:

                pass
# download_file = Path(selenium_tool.download_dir).joinpath("OneDrive_1_10-25-2021.zip").resolve()

if len(actually_downloaded_file_list) == 1:
    download_file = Path(selenium_tool.download_dir).joinpath(actually_downloaded_file_list[0]).resolve()

else:
    download_file = Path(selenium_tool.download_dir).joinpath(
    "OneDrive_1_" + time.strftime('%m-%d-%Y') + ".zip").resolve()
if download_file.exists():
    download_file.unlink()
xpath_str = '//button[@data-automationid="downloadCommand"]'
_ = selenium_tool.query_click_button(xpath_str=xpath_str)
selenium_tool.is_file_downloaded(file_path=download_file)
# download_file
if len(actually_downloaded_file_list) > 1:
    zip_file = ZipFile(download_file, 'r')

    for file_name in download_file_name_list:
        zip_file.extract(file_name, path=target_file_path)

def upload_file(finaoutputfile):
# is_headless=False
selenium_tool = SeleniumTool(is_headless=False)
# test_sharepoint_url= “https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”
# test_sharepoint_url = “https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2FUploadFile&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b”
with open(r"C:\Users\Public\pythonAutomationConfig\uploadConfig.txt", “r”, encoding=‘utf-8’) as f:
test_sharepoint_url = f.readline()
selenium_tool.load_url(test_sharepoint_url)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
with open(r"C:\Users\Public\pythonAutomationConfig\mailConfig.txt", “r”, encoding=‘utf-8’) as f:
mailname = f.readline()
if len(elements) > 0:
# selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
selenium_tool.input_text_element(text_str=mailname, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)

# upload_file_name = r'C:\wz\pythonFile\3333.xlsx'
up_file_path = r'C:\Users\Public\AdjustmentFile'
file_name = finaoutputfile
upload_file_name = os.path.join(up_file_path,file_name)
print('upload_file_name: '+upload_file_name)
upload_button_xpath = "//button[@name='Upload']"
upload_file_button_xpath = "//button[@name='Files']"
local_filepath = Path(__file__).parent.joinpath(upload_file_name).resolve()
selenium_tool.query_click_button(xpath_str=upload_button_xpath)
selenium_tool.query_click_button(xpath_str=upload_file_button_xpath)
selenium_tool.windows_utils.input_local_filepath_for_native_window_win32gui(filepath=str(local_filepath))
time.sleep(0.2)
replace_button_xpath = '//div[@role="alert"]//button[@name="Replace"]'
try:
    selenium_tool.query_click_button(xpath_str=replace_button_xpath, explicit_wait_timeout=5)
except TimeoutException:
    selenium_tool.force_quit_driver()
    print('timeout')
time.sleep(0.2)
file_name = file_name.split('\\')[-1]
print(f"file_name {file_name}")
try:
    selenium_tool.is_file_uploaded(filename=file_name)
except Exception:
    e_info = traceback.format_exc()
    print(f"{file_name} == {e_info}")
finally:
    selenium_tool.force_quit_driver()
    print(f"upload complete {file_name}")

def upload_file1130(finaoutputfile):
# is_headless=False
selenium_tool = SeleniumTool(is_headless=False)
# test_sharepoint_url= “https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”
test_sharepoint_url = “https://aiacom.sharepoint.com/sites/Cashflowfileupload/Shared%20Documents/Forms/AllItems.aspx?FolderCTID=0x0120006D8365F3F6FA34409FEFCC9C266CDA51&id=%2Fsites%2FCashflowfileupload%2FShared%20Documents%2FUploadFile&viewid=8a7b63cc%2Dde2c%2D4086%2Dbb9e%2D5f9943d8302b”
selenium_tool.load_url(test_sharepoint_url)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)

# upload_file_name = r'C:\wz\pythonFile\3333.xlsx'
up_file_path = r'C:\Users\Public\pythonFile1'
file_name = finaoutputfile
upload_file_name = os.path.join(up_file_path,file_name)
print('upload_file_name: '+upload_file_name)
upload_button_xpath = "//button[@name='Upload']"
upload_file_button_xpath = "//button[@name='Files']"
local_filepath = Path(__file__).parent.joinpath(upload_file_name).resolve()
selenium_tool.query_click_button(xpath_str=upload_button_xpath)
selenium_tool.query_click_button(xpath_str=upload_file_button_xpath)
selenium_tool.windows_utils.input_local_filepath_for_native_window_win32gui(filepath=str(local_filepath))
time.sleep(0.3)
replace_button_xpath = '//div[@role="alert"]//button[@name="Replace"]'
try:
    selenium_tool.query_click_button(xpath_str=replace_button_xpath, explicit_wait_timeout=5)
except TimeoutException:
    selenium_tool.force_quit_driver()
    print('timeout')
time.sleep(0.2)
file_name = file_name.split('\\')[-1]
selenium_tool.is_file_uploaded(filename=file_name)
selenium_tool.force_quit_driver()
# import shutil
# # finaoutputfile
# orignalfilepath = r'C:\Users\Public\pythonFile1'
# orignalfile = os.path.join(orignalfilepath,finaoutputfile)
# targetfilepath = r'C:\wz\Moses'
# print('start to move file')
# shutil.move(orignalfile, targetfilepath)
# 最后删除文件
# time.sleep(0.5)
# print('start to remove file')
# for root, dirs, files in os.walk(r"C:\Users\Public\pythonFile"):
#     for name in files:
#         if name.endswith(".xlsx"):  # 指定要删除的文件格式,这里是png,可以换成其他格式
#             os.remove(os.path.join(root, name))
#             print("Delete File: " + os.path.join(root, name))
#         if name.endswith(".csv"):  # 指定要删除的文件格式,这里是png,可以换成其他格式
#             os.remove(os.path.join(root, name))
#             print("Delete File: " + os.path.join(root, name))
#         # if name.endswith(".txt"):  # 指定要删除的文件格式,这里是png,可以换成其他格式
#         #     os.remove(os.path.join(root, name))
#         #     print("Delete File: " + os.path.join(root, name))

def upload_file_bak1():
# is_headless=False
selenium_tool = SeleniumTool(is_headless=False)
test_sharepoint_url= “https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”
selenium_tool.load_url(test_sharepoint_url)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)

upload_file_name = r'C:\wz\pythonFile\3333.xlsx'
upload_button_xpath = "//button[@name='Upload']"
upload_file_button_xpath = "//button[@name='Files']"
local_filepath = Path(__file__).parent.joinpath(upload_file_name).resolve()
selenium_tool.query_click_button(xpath_str=upload_button_xpath)
selenium_tool.query_click_button(xpath_str=upload_file_button_xpath)
selenium_tool.windows_utils.input_local_filepath_for_native_window_win32gui(filepath=str(local_filepath))
time.sleep(1)
replace_button_xpath = '//div[@role="alert"]//button[@name="Replace"]'
try:
    selenium_tool.query_click_button(xpath_str=replace_button_xpath, explicit_wait_timeout=5)
except TimeoutException:
    print('timeout')
time.sleep(1)
selenium_tool.is_file_uploaded(filename='3333.xlsx')
selenium_tool.force_quit_driver()

def upload_file_bak():
# is_headless=False
selenium_tool = SeleniumTool(is_headless=False)
test_sharepoint_url= “https://aiacom.sharepoint.com/sites/EASteam/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FEASteam%2FShared%20Documents%2Ftest&viewid=dbf4c9a2%2Ddf5a%2D4917%2Db2ff%2Da317c21f608a”
selenium_tool.load_url(test_sharepoint_url)
validate_account_xpath = ‘//input[@type=“email”]’
elements = selenium_tool.query_elements(xpath_str=validate_account_xpath)
if len(elements) > 0:
selenium_tool.input_text_element(text_str=“Eoin-Z.Wang@aia.com”, xpath_str=validate_account_xpath)
next_button_xpath = ‘//input[@type=“submit”]’
selenium_tool.query_click_button(xpath_str=next_button_xpath)

upload_file_name = r'C:\wz\pythonFile\3333.xlsx'
upload_button_xpath = "//button[@name='Upload']"
upload_file_button_xpath = "//button[@name='Files']"
local_filepath = Path(__file__).parent.joinpath(upload_file_name).resolve()
selenium_tool.query_click_button(xpath_str=upload_button_xpath)
selenium_tool.query_click_button(xpath_str=upload_file_button_xpath)
selenium_tool.windows_utils.input_local_filepath_for_native_window_win32gui(filepath=str(local_filepath))
time.sleep(1)
replace_button_xpath = '//div[@role="alert"]//button[@name="Replace"]'
try:
    selenium_tool.query_click_button(xpath_str=replace_button_xpath, explicit_wait_timeout=5)
except TimeoutException:
    print('timeout')
time.sleep(1)
selenium_tool.is_file_uploaded(filename='3333.xlsx')
selenium_tool.force_quit_driver()

if name == ‘main’:
# print(111)
# download_files_batch_bakforOnefile(‘Philippines_2021-11-24.txt’)
# download_files_batch_bak(‘1.txt’)
# download_files_batch_bak()
# upload_file()
# main(‘fork1.txt’,r’C:\wz\pythonFile’)
# main_bak()
upload_fileforCFA(‘U110_Cashflow_AIA_PH_20220321160533.csv’)


sim_selenium_tool.py
import os
import re
import sys
import time
from zipfile import ZipFile
from pathlib import Path
from datetime import datetime
from typing import List, Optional
from pathlib import Path
sys.path.append(str(Path(file).parent.resolve()))
sys.path.append(str(Path(file).parent.parent.resolve()))

import win32gui
import win32api

import warnings
import winreg

from requests.packages.urllib3.exceptions import InsecureRequestWarning

from win32con import WM_INPUTLANGCHANGEREQUEST, WM_SETTEXT, WM_COMMAND
import win32com.client
import requests
from selenium.webdriver import Chrome, ChromeOptions
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.common.alert import Alert
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import StaleElementReferenceException, SessionNotCreatedException, TimeoutException

class DownloadTimeoutException(Exception):
pass

class UploadInputLocalPathFailException(Exception):
pass

class WindowsUtils(object):

@staticmethod
def set_english_keyboard_layout() -> bool:
    hwnd = win32gui.GetForegroundWindow()
    result = win32api.SendMessage(hwnd, WM_INPUTLANGCHANGEREQUEST, 0, 0x0409)
    return result == 0

@classmethod
def input_local_filepath_for_native_window_win32gui(cls, filepath: str) -> None:
    """ input the local filepath into a native window dialog

    :param filepath:
    :return:
    """
    result = cls.set_english_keyboard_layout()
    time_to_wait = 2  # must bigger than 1, otherwise the filepath can't input entirely
    if result:
        time.sleep(time_to_wait)
        dialog = win32gui.FindWindow('#32770', 'Open')
        combobox_ex32_field = win32gui.FindWindowEx(dialog, 0, 'ComboBoxEx32', None)
        combobox_field = win32gui.FindWindowEx(combobox_ex32_field, 0, 'ComboBox', None)
        edit_field = win32gui.FindWindowEx(combobox_field, 0, 'Edit', None)
        button = win32gui.FindWindowEx(dialog, 0, 'Button', None)
        win32gui.SendMessage(edit_field, WM_SETTEXT, None, filepath)
        win32gui.SendMessage(dialog, WM_COMMAND, 1, button)
    else:
        raise UploadInputLocalPathFailException(filepath)

@classmethod
def input_local_filepath_for_native_window_win32com(cls, filepath: str):
    result = cls.set_english_keyboard_layout()
    if result:
        time.sleep(1)
        shell = win32com.client.Dispatch("WScript.Shell")
        filepath = str(Path(filepath))
        shell.Sendkeys(filepath)
        shell.Sendkeys('~')
    else:
        raise UploadInputLocalPathFailException(filepath)

class ChromeDriverUtil(object):

def __init__(self,
             web_driver_url: Optional[str] = None,
             web_driver_url_endpoint: Optional[str] = None,
             driver_path: Optional[str] = None,
             chrome_version: Optional[str] = None):
    self.web_driver_url = web_driver_url if web_driver_url is not None \
        else 'https://chromedriver.storage.googleapis.com'
    self.web_driver_url_endpoint = web_driver_url_endpoint if web_driver_url_endpoint is not None \
        else 'chromedriver_win32.zip'
    self.chrome_version = chrome_version
    self.proxies = {
        'http': 'http://HKPriProxy.aia.biz:10938',
        'https': 'http://HKPriProxy.aia.biz:10938',
    }
    self.timeout = 60
    self.verify = False
    self.driver_filename = 'chromedriver.exe'
    if driver_path is None:
        raise FileNotFoundError('driver path is None')
    else:
        self.driver_path = driver_path

def filter_version(self, raw_error_msg: str):
    reg_p = r'(?<=[Current browser version is\W])(\d*\.\d*\.\d*\.\d*)(?=[\Wwith binary path])'
    reg_result = re.search(reg_p, raw_error_msg)
    browser_version = reg_result.group(0)
    self.chrome_version = browser_version

@staticmethod
def get_chrome_version() -> str:
    key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Google\CHrome\BLBeacon")
    chrome_version = winreg.QueryValueEx(key, 'version')[0]
    return chrome_version

def update_safe(self) -> Optional[Path]:
    if self.chrome_version is None:
        self.chrome_version = self.get_chrome_version()
    last_version = int(self.chrome_version.split('.')[-1])
    result = None
    for i in reversed(range(last_version+1)):
        try:
            r_path = self.update()
        except FileNotFoundError:
            tmp_list = self.chrome_version.split('.')
            tmp_list[-1] = str(i)
            tmp_str = '.'.join(tmp_list)
            self.chrome_version = tmp_str
        else:
            result = r_path
            break
    return result

def update(self) -> Optional[Path]:
    # warnings.simplefilter('ignore', InsecureRequestWarning)
    zip_url = f"{self.web_driver_url}/{self.chrome_version}/{self.web_driver_url_endpoint }"
    r = requests.get(zip_url, proxies=self.proxies, timeout=self.timeout, verify=self.verify)
    if r.status_code == 404:
        raise FileNotFoundError(f"zip_url: {zip_url} is not found")
    elif r.status_code != 200:
        raise ConnectionError(f'{zip_url} cannot access normally')
    chrome_driver_folder_path = os.path.dirname(self.driver_path)
    chrome_driver_zip_file_path = f"{chrome_driver_folder_path}\\chrome_driver.zip"

    with open(chrome_driver_zip_file_path, mode='wb') as f:
        f.write(r.content)
        zip_file = ZipFile(chrome_driver_zip_file_path, 'r')
        if os.path.exists(self.driver_path):
           os.remove(self.driver_path)
        zip_file.extract(self.driver_filename, path=chrome_driver_folder_path)
        return Path(self.driver_path)

@staticmethod
def kill_process_in_windows(process_name: str = None):
    if process_name is None:
        process_name = "chromedriver.exe"
    os.system("taskkill /f /im " + process_name)

class SeleniumTool(object):

LOG_FILE = Path(__file__).resolve().parent.parent\
    .parent.joinpath(f"log/driver_{datetime.strftime(datetime.now(), '%m%d%Y')}.log")

def __init__(self, is_headless: bool = False, download_dir: Optional[Path] = None):
    self.explicit_wait_timeout = 30
    self.download_dir = download_dir
    self.windows_utils = WindowsUtils
    options = ChromeOptions()

    if is_headless:
        options.add_argument('--headless')
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-gpu')
    options.add_argument('--ignore-certificate-errors')
    options.add_argument('--disable-notifications')
    options.add_experimental_option("detach", True)
    prefs_params = {
        'download.default_directory': str(self.download_dir),
        'download.prompt_for_download': False,
        'profile.managed_default_content_settings.images': 2 }
    options.add_experimental_option("prefs", prefs_params)
    options.add_experimental_option('excludeSwitches', ['enable-automation'])
    options.add_experimental_option('useAutomationExtension', False)
    if not self.get_web_driver_path().exists():
        ins = ChromeDriverUtil(driver_path=self.get_web_driver_path())
        ins.update_safe()
    try:
        self.driver = Chrome(options=options,
                             executable_path=str(self.get_web_driver_path()))
    except SessionNotCreatedException as e:
        ins = ChromeDriverUtil(driver_path=self.get_web_driver_path())
        ins.filter_version(raw_error_msg=e.args[0])
        ins.update_safe()
        self.driver = Chrome(options=options,
                             executable_path=self.get_web_driver_path())
    # self.driver.set_window_size(1920, 1080)
    js_zoom_out = "document.body.style.zoom='0.5'"
    self.driver.execute_script(js_zoom_out)
    self.action_chains = ActionChains(self.driver)

@staticmethod
def get_web_driver_path() -> Path:
    web_driver_path = Path(__file__).resolve().parent.parent.parent
    if 'linux' in sys.platform:
        web_driver_path = '/usr/bin/chromedriver'
    else:
        web_driver_path = web_driver_path.joinpath('src/utils/chromedriver.exe').resolve()
    return web_driver_path

@staticmethod
def is_file_downloaded(file_path: Path, timeout: int = 300) -> bool:
    count_wait_time = 0
    while not file_path.exists():
        if count_wait_time == timeout:
            raise DownloadTimeoutException(f'{file_path} -> timeout: {timeout} s')
        time.sleep(1)
        count_wait_time += 1

    return file_path.exists()

def is_file_uploaded(self, filename: str, timeout: int = 7200) -> bool:
    if len(filename) == 0:
        raise FileNotFoundError
    file_element = None
    count_wait_time = 0
    while file_element is None:
        count_wait_time += 1
        if count_wait_time == timeout:
            raise TimeoutException(filename)
        file_element = self.check_file_element_exist(filename=filename)
        if file_element is None:
            time.sleep(1)
    return True

def check_file_element_exist(self, filename: str) -> Optional[WebElement]:
    result = None
    query_elements = self.get_all_file_elements()
    for item in query_elements:
        try:
            if filename in item.text:
                result = item
                break
        except StaleElementReferenceException:
            continue
    return result

def get_all_file_elements(self) -> List[WebElement]:
    result = []
    ms_list_surface_element_xpath = "//div[@role='presentation' and @class='ms-List-surface']"
    try:
        ms_list_surface_element = self.query_element(xpath_str=ms_list_surface_element_xpath)
        if ms_list_surface_element.rect['height'] < 1:
            return result
    except TimeoutException:
        return result
    query_element_xpath = f"//div[@class='Files-content']//div[@class='ms-List']" \
                          f"//button[@role='link' and @data-automationid='FieldRenderer-name']"
    query_elements = self.query_elements(xpath_str=query_element_xpath)
    return query_elements

def load_url(self, url: str = None):
    if url is None or len(url) == 0:
        raise ValueError(f'url {url} is not valid')
    self.driver.get(url)
    self.driver.set_window_size(1920, 1080)

def refresh(self):
    self.driver.refresh()

def query_elements(self, xpath_str: str, explicit_wait_timeout: int = None) -> List[WebElement]:
    if explicit_wait_timeout is None:
        explicit_wait_timeout = self.explicit_wait_timeout
    wait = WebDriverWait(self.driver, explicit_wait_timeout)
    elements = wait.until(EC.visibility_of_all_elements_located((By.XPATH, xpath_str)))
    return elements

def query_element(self, xpath_str: str, explicit_wait_timeout: int = None) -> List[WebElement]:
    if explicit_wait_timeout is None:
        explicit_wait_timeout = self.explicit_wait_timeout
    wait = WebDriverWait(self.driver, explicit_wait_timeout)
    elements = wait.until(EC.visibility_of_element_located((By.XPATH, xpath_str)))
    return elements

def query_click_button(self, xpath_str: str, explicit_wait_timeout: int = None) -> Optional[WebElement]:
    if explicit_wait_timeout is None:
        explicit_wait_timeout = self.explicit_wait_timeout

    wait = WebDriverWait(self.driver, explicit_wait_timeout)
    element = wait.until(EC.element_to_be_clickable((By.XPATH, xpath_str)))
    # self.action_chains.move_to_element(element).click(element).perform()
    element.click()
    return element

def input_text_element(self, text_str: str, xpath_str: str, explicit_wait_timeout: int = None) -> WebElement:
    elements = self.query_elements(xpath_str=xpath_str, explicit_wait_timeout=explicit_wait_timeout)
    result = elements[0]
    result.send_keys(text_str)
    return result

def get_alert(self, explicit_wait_timeout: int = None) -> Alert:
    if explicit_wait_timeout is None:
        explicit_wait_timeout = self.explicit_wait_timeout
    wait = WebDriverWait(self.driver, explicit_wait_timeout)
    wait.until(EC.alert_is_present())
    ale = self.driver.switch_to.alert
    return ale

def __del__(self):
    self.force_quit_driver()

def force_quit_driver(self):
    if self.driver:
        self.driver.quit()
举报

相关推荐

0 条评论