容器实例管理python sdk封装
 
测试结果
 

 
说明
 
- 这是根据我的需求写的,所以有些参数是写死的,比如cpu核数和内存,你可以根据你的需要自行修改。
 - 前置条件:
 当前环境已安装python3.8以上版本和azure cli并且已经登陆到你的账户 
 
依赖安装
 
requirments.txt
 
azure-mgmt-resource
azure-identity
azure-mgmt-containerinstance
 
pip install -r requirments.txt
 
 
containerhelpor.py
 
from azure.identity import DefaultAzureCredential
from azure.mgmt.containerinstance import ContainerInstanceManagementClient
from azure.mgmt.containerinstance.models import ContainerGroup, Container, ContainerPort, IpAddress, ResourceRequests, ResourceRequirements
class ContainerHelpor:
    
    def create(subscription_id:str, resource_group:str, location:str, container_name:str, container_image:str, no_port:int, protocol: str='TCP'):
     
        
        credentials = DefaultAzureCredential()
        
        
        container_client = ContainerInstanceManagementClient(credentials, subscription_id)
        
        port1 = ContainerPort(port=no_port, protocol=protocol)
        ports = [port1]
        
        
        ip_address = IpAddress(ports=ports, type='Public')
       
        
        requests = ResourceRequests(memory_in_gb=1.0, cpu=1.0)
        requirements = ResourceRequirements(requests=requests)
        
        container = Container(
            name=container_name,
            image=container_image,
            resources=requirements,
            ports=ports
        )
        
        container_group = ContainerGroup(
            location=location,
            containers=[container],
            os_type='Linux',
            ip_address=ip_address
        )
        
        container_group_result = container_client.container_groups.begin_create_or_update(resource_group, container_name, container_group)
        print(f"Container instance {container_name} created successfully.")
        return container_group_result
    def stop(subscription_id:str, resource_group:str, container_name:str):
        
        credential = DefaultAzureCredential()
        
        client = ContainerInstanceManagementClient(credential, subscription_id)
        
        
        client.container_groups.stop(resource_group, container_name)
        print(f"Container instance {container_name} stoped successfully.")
    def start(subscription_id:str, resource_group:str, container_name:str):
        
        credential = DefaultAzureCredential()
        
        client = ContainerInstanceManagementClient(credential, subscription_id)
        
        
        client.container_groups.begin_start(resource_group, container_name)
        print(f"Container instance {container_name} started successfully.")
    def remove(subscription_id:str, resource_group:str, container_name:str):
        
        credentials = DefaultAzureCredential()
        
        container_client = ContainerInstanceManagementClient(credentials, subscription_id)
        
        container_client.container_groups.begin_delete(resource_group, container_name)
        print(f"Container instance {container_name} deleted successfully.")
    def query_ip(subscription_id:str, resource_group:str, container_name:str):
        credential = DefaultAzureCredential()
        client = ContainerInstanceManagementClient(credential, subscription_id)
        container_group = client.container_groups.get(resource_group, container_name)
        return  container_group.ip_address
 
测试代码
 
import sys
import os
import time
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
from core.containerhelpor import ContainerHelpor
def main():
    print("containerhelpor test begin")
    
    sub_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
    container_name = 'test' 
    resource_group = 'jp' 
    print("ContainerHelpor.create begin")
    ContainerHelpor.create(subscription_id=sub_id,
    no_port=8080,
    resource_group=resource_group,
    location = 'Japan West',
    container_name = container_name,
    
    container_image = '' 
    )
    print("ContainerHelpor.stop begin")
    ContainerHelpor.stop(sub_id,resource_group,container_name)
    print("ContainerHelpor.start begin")
    is_not_start = True
    retry_cnt = 0
    while is_not_start:
        try:
            ContainerHelpor.start(sub_id,resource_group,container_name)
            is_not_start = False
        except Exception as e: 
            time.sleep(2)
            retry_cnt += 1
            if retry_cnt > 3:
                print(e)
                ContainerHelpor.remove(sub_id,resource_group,container_name)
                print("containerhelpor test failed")
                return
    print("ContainerHelpor.query_ip begin")
    retry_cnt = 0
    ip = None
    while ip is None:
        ipaddr = ContainerHelpor.query_ip(sub_id,resource_group,container_name)
        ip = ipaddr.ip
        time.sleep(2)
        retry_cnt += 1
        if retry_cnt > 10:
            print("query_ip failed, try it later")
            break
    print(ip)
    print("ContainerHelpor.remove begin")
    ContainerHelpor.remove(sub_id,resource_group,container_name)
    print("containerhelpor test pass")
if __name__ == "__main__":
    main()