0
点赞
收藏
分享

微信扫一扫

【24届数字IC秋招总结】提前批面试经验1——小米、百度昆仑芯、长鑫存储

八怪不姓丑 04-13 10:00 阅读 1

容器实例管理python sdk封装

测试结果

在这里插入图片描述

说明

  • 这是根据我的需求写的,所以有些参数是写死的,比如cpu核数和内存,你可以根据你的需要自行修改。
  • 前置条件:
    当前环境已安装python3.8以上版本和azure cli并且已经登陆到你的账户

依赖安装

requirments.txt

azure-mgmt-resource
azure-identity
azure-mgmt-containerinstance
pip install -r requirments.txt

containerhelpor.py

from azure.identity import DefaultAzureCredential
from azure.mgmt.containerinstance import ContainerInstanceManagementClient
from azure.mgmt.containerinstance.models import ContainerGroup, Container, ContainerPort, IpAddress, ResourceRequests, ResourceRequirements

class ContainerHelpor:
    
    def create(subscription_id:str, resource_group:str, location:str, container_name:str, container_image:str, no_port:int, protocol: str='TCP'):
     
        # 使用默认 Azure 凭据进行验证
        credentials = DefaultAzureCredential()
        
        # 创建容器实例管理客户端实例
        container_client = ContainerInstanceManagementClient(credentials, subscription_id)

        # 容器端口和 IP 地址配置
        port1 = ContainerPort(port=no_port, protocol=protocol)
        ports = [port1]
        # port2 = ContainerPort(port=no_port + 1, protocol=protocol)
        # ports = [port1, port2]
        ip_address = IpAddress(ports=ports, type='Public')
       
        # 容器资源请求
        requests = ResourceRequests(memory_in_gb=1.0, cpu=1.0)
        requirements = ResourceRequirements(requests=requests)

        # 创建容器
        container = Container(
            name=container_name,
            image=container_image,
            resources=requirements,
            ports=ports
        )

        # 容器组(容器实例)
        container_group = ContainerGroup(
            location=location,
            containers=[container],
            os_type='Linux',
            ip_address=ip_address
        )

        # 创建容器实例
        container_group_result = container_client.container_groups.begin_create_or_update(resource_group, container_name, container_group)
        print(f"Container instance {container_name} created successfully.")
        return container_group_result

    def stop(subscription_id:str, resource_group:str, container_name:str):

        # 使用默认凭证进行身份验证
        credential = DefaultAzureCredential()

        # 创建容器实例管理客户端
        client = ContainerInstanceManagementClient(credential, subscription_id)
        
        # 停止容器实例
        client.container_groups.stop(resource_group, container_name)

        print(f"Container instance {container_name} stoped successfully.")

    def start(subscription_id:str, resource_group:str, container_name:str):

        # 使用默认凭证进行身份验证
        credential = DefaultAzureCredential()

        # 创建容器实例管理客户端
        client = ContainerInstanceManagementClient(credential, subscription_id)
        
        # 停止容器实例
        client.container_groups.begin_start(resource_group, container_name)

        print(f"Container instance {container_name} started successfully.")

    def remove(subscription_id:str, resource_group:str, container_name:str):
        # 创建 Azure 认证凭证
        credentials = DefaultAzureCredential()

        # 创建 ContainerInstanceManagementClient
        container_client = ContainerInstanceManagementClient(credentials, subscription_id)

        # 删除容器实例
        container_client.container_groups.begin_delete(resource_group, container_name)
        print(f"Container instance {container_name} deleted successfully.")

    def query_ip(subscription_id:str, resource_group:str, container_name:str):
        credential = DefaultAzureCredential()
        client = ContainerInstanceManagementClient(credential, subscription_id)

        container_group = client.container_groups.get(resource_group, container_name)

        return  container_group.ip_address

测试代码

import sys
import os
import time

# 此处根据你的项目选择包含目录以及导入导出模块
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
from core.containerhelpor import ContainerHelpor

def main():
    print("containerhelpor test begin")
    # 这里填你的订阅id
    sub_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
    container_name = 'test' # 你的容器名称
    resource_group = 'jp' # 你的资源组组名
    print("ContainerHelpor.create begin")
    ContainerHelpor.create(subscription_id=sub_id,
    no_port=8080,
    resource_group=resource_group,
    location = 'Japan West',
    container_name = container_name,
    # 镜像名称以及tag默认指向dockerhub的latset
    container_image = '' 
    )
    print("ContainerHelpor.stop begin")
    ContainerHelpor.stop(sub_id,resource_group,container_name)

    print("ContainerHelpor.start begin")
    is_not_start = True
    retry_cnt = 0

    while is_not_start:
        try:
            ContainerHelpor.start(sub_id,resource_group,container_name)
            is_not_start = False
        except Exception as e: 
            time.sleep(2)
            retry_cnt += 1
            if retry_cnt > 3:
                print(e)
                ContainerHelpor.remove(sub_id,resource_group,container_name)
                print("containerhelpor test failed")
                return


    print("ContainerHelpor.query_ip begin")
    retry_cnt = 0
    ip = None
    while ip is None:
        ipaddr = ContainerHelpor.query_ip(sub_id,resource_group,container_name)
        ip = ipaddr.ip
        time.sleep(2)
        retry_cnt += 1
        if retry_cnt > 10:
            print("query_ip failed, try it later")
            break
    print(ip)
    print("ContainerHelpor.remove begin")
    ContainerHelpor.remove(sub_id,resource_group,container_name)

    print("containerhelpor test pass")
if __name__ == "__main__":
    main()
举报

相关推荐

0 条评论