容器实例管理python sdk封装
测试结果
说明
- 这是根据我的需求写的,所以有些参数是写死的,比如cpu核数和内存,你可以根据你的需要自行修改。
- 前置条件:
当前环境已安装python3.8以上版本和azure cli并且已经登陆到你的账户
依赖安装
requirments.txt
azure-mgmt-resource
azure-identity
azure-mgmt-containerinstance
pip install -r requirments.txt
containerhelpor.py
from azure.identity import DefaultAzureCredential
from azure.mgmt.containerinstance import ContainerInstanceManagementClient
from azure.mgmt.containerinstance.models import ContainerGroup, Container, ContainerPort, IpAddress, ResourceRequests, ResourceRequirements
class ContainerHelpor:
def create(subscription_id:str, resource_group:str, location:str, container_name:str, container_image:str, no_port:int, protocol: str='TCP'):
credentials = DefaultAzureCredential()
container_client = ContainerInstanceManagementClient(credentials, subscription_id)
port1 = ContainerPort(port=no_port, protocol=protocol)
ports = [port1]
ip_address = IpAddress(ports=ports, type='Public')
requests = ResourceRequests(memory_in_gb=1.0, cpu=1.0)
requirements = ResourceRequirements(requests=requests)
container = Container(
name=container_name,
image=container_image,
resources=requirements,
ports=ports
)
container_group = ContainerGroup(
location=location,
containers=[container],
os_type='Linux',
ip_address=ip_address
)
container_group_result = container_client.container_groups.begin_create_or_update(resource_group, container_name, container_group)
print(f"Container instance {container_name} created successfully.")
return container_group_result
def stop(subscription_id:str, resource_group:str, container_name:str):
credential = DefaultAzureCredential()
client = ContainerInstanceManagementClient(credential, subscription_id)
client.container_groups.stop(resource_group, container_name)
print(f"Container instance {container_name} stoped successfully.")
def start(subscription_id:str, resource_group:str, container_name:str):
credential = DefaultAzureCredential()
client = ContainerInstanceManagementClient(credential, subscription_id)
client.container_groups.begin_start(resource_group, container_name)
print(f"Container instance {container_name} started successfully.")
def remove(subscription_id:str, resource_group:str, container_name:str):
credentials = DefaultAzureCredential()
container_client = ContainerInstanceManagementClient(credentials, subscription_id)
container_client.container_groups.begin_delete(resource_group, container_name)
print(f"Container instance {container_name} deleted successfully.")
def query_ip(subscription_id:str, resource_group:str, container_name:str):
credential = DefaultAzureCredential()
client = ContainerInstanceManagementClient(credential, subscription_id)
container_group = client.container_groups.get(resource_group, container_name)
return container_group.ip_address
测试代码
import sys
import os
import time
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
from core.containerhelpor import ContainerHelpor
def main():
print("containerhelpor test begin")
sub_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
container_name = 'test'
resource_group = 'jp'
print("ContainerHelpor.create begin")
ContainerHelpor.create(subscription_id=sub_id,
no_port=8080,
resource_group=resource_group,
location = 'Japan West',
container_name = container_name,
container_image = ''
)
print("ContainerHelpor.stop begin")
ContainerHelpor.stop(sub_id,resource_group,container_name)
print("ContainerHelpor.start begin")
is_not_start = True
retry_cnt = 0
while is_not_start:
try:
ContainerHelpor.start(sub_id,resource_group,container_name)
is_not_start = False
except Exception as e:
time.sleep(2)
retry_cnt += 1
if retry_cnt > 3:
print(e)
ContainerHelpor.remove(sub_id,resource_group,container_name)
print("containerhelpor test failed")
return
print("ContainerHelpor.query_ip begin")
retry_cnt = 0
ip = None
while ip is None:
ipaddr = ContainerHelpor.query_ip(sub_id,resource_group,container_name)
ip = ipaddr.ip
time.sleep(2)
retry_cnt += 1
if retry_cnt > 10:
print("query_ip failed, try it later")
break
print(ip)
print("ContainerHelpor.remove begin")
ContainerHelpor.remove(sub_id,resource_group,container_name)
print("containerhelpor test pass")
if __name__ == "__main__":
main()