0
点赞
收藏
分享

微信扫一扫

面对对象&网络&并发编程(4)

简单聊育儿 04-12 14:00 阅读 0
网络python

面向对象&网络&并发编程

1.面向对象

  • 面向对象,Python中支持两种编程方式来写代码,分别是:函数式编程面向对象式编程

    • 函数式

      # 函数式
      # # 定义函数,在函数中实现功能
      
      
      def func():
          print("面向对象")
      
      
      # 执行函数
      func()
      
    • 面向对象

      # 面向对象
      # # 定义类
      
      class Foo(object):
          # 在类中定义方法
          def func(self):
              print("面向对象")
      
      
      # 实例化类的对象
      obj = Foo()
      
      # 执行类中的方法
      obj.func()
      

1.面向对象基础

1.初始面向对象
  • 定义类,在类中定义方法,在方法中去实现具体的功能

  • 实例化类并创建一个对象,通过对象去调用并执行方法

    class Message:
        '''
        注意:
            1.类名称首字母大写 & 驼峰式命名
            2.py3之后默认类都继承object
            3.在类中编写的函数称为方法
            4.每个方法的第一参数是self
        '''
        def send_email(self, email, content):
            data = "给{}发邮件,内容是: {}".format(email, content)
            print(data)
    
    
    msg_object = Message()
    msg_object.send_email("29***651@qq.com", "注册成功")
    
  • 类中可以定义多个方法

    class Message:
    
        def send_email(self, email, content):
            data = "给{}发邮件,内容是: {}".format(email, content)
            print(data)
    
        def send_wechat(self, vid, content):
            data = "给{}发微信,内容是: {}".format(vid, content)
            print(data)
    
    
    msg_object = Message()
    msg_object.send_email("29***651@qq.com", "注册成功")
    msg_object.send_wechat("Ailke", "注册成功")
    
1.对象和self
  • 在每个类都可以定义个特殊的: **___init_**初始化方法,在实例化类创建对象时自动执行,即: 对象=类()

    class Message:
        
        '''
        对象 = 类名()   自动执行类中的__init__方法
        
        1.根据类型创建一个对象,内存的一块 区域
        2.执行__init__方法,模块会创建的那块区域的内存地址当self参数传递进去      往区域中(data = "注册成功" )
        '''
    
        def __init__(self, content):
            self.content = content
    
        def send_email(self, email):
            data = "给{}发邮件,内容是: {}".format(email, self.content)
            print(data)
    
        def send_wechat(self, vid):
            data = "给{}发微信,内容是: {}".format(vid, self.content)
            print(data)
    
    
    msg_object = Message("注册成功")
    msg_object.send_email("29***651@qq.com")  # 给29***651@qq.com发邮件,内容是:注册成功
    msg_object.send_wechat("Ailke")  # 给Ailke发微信,内容是:注册成功
    
    • 对象,可以在它的内部先封装一部分数据,以后想要使用时,再去里面获取
    • self, 类中的方法需要要由这个类的对象来触发并执行(对象.方法名),且再执行时会自动将对象当作参数传递给self,以供方法中获取已封装的值
    • 注:除了self默认参数以外,方法中的参数的定义和执行与函数是相同
  • 根据类创建多个对象并执行其中的方法

    class Message:
    
        '''
        对象 = 类名()   自动执行类中的__init__方法
    
        1.根据类型创建一个对象,内存的一块 区域
        2.执行__init__方法,模块会创建的那块区域的内存地址当self参数传递进去      往区域中(data = "注册成功" )
        '''
    
        def __init__(self, content):
            self.content = content
    
        def send_email(self, email):
            data = "给{}发邮件,内容是: {}".format(email, self.content)
            print(data)
    
        def send_wechat(self, vid):
            data = "给{}发微信,内容是: {}".format(vid, self.content)
            print(data)
    
    
    msg_object = Message("注册成功")
    msg_object.send_email("29***651@qq.com")
    msg_object.send_wechat("Ailke")
    
    login_object = Message("登录成功")
    login_object.send_email("29***651@qq.com")
    login_object.send_wechat("Ailke")
    
    • 面对对象的思想
      • 将一些数据封装到对象中,在执行方法时,再去对象中获取
    • 函数式的思想
      • 函数内部需要的数据均通过参数的形式传递
2.常见成员
  • 实例变量,属于对象,只能通过对象调用

  • 绑定方法,属于类,通过 对象调用通过类调用

    class Person:
    
        country = "中国"
        
        def __init__(self, name, age):
            self.name = name
            self.age = age
    
        # 绑定方法
        def show(self):
            print("{}的年龄是{}".format(self.name, self.age))
    
        def all_message(self):
            print("我是{}人,我叫{},我的年龄是{}".format(Person.country, self.name, self.age))
    
        def total_message(self):
            msg = "我是{}人,我叫{},我的年龄是{}".format(self.country, self.name, self.age)
            print(msg)
    
    
    # 执行绑定方法
    p1 = Person("Ailke", 20)
    p1.show()
    # 都是调用show函数,效果类似
    p2 = Person("Ailke", 25)
    Person.show(p2)
    
2.三大特性
1.封装
  • 将同一类方法封装到一个类中
  • 将数据封装到对象中,再实例化一个对象是,可以通过**__init__**初始化方法在对象中封装一些数据,以便以后使用
2.继承
  • 子类可以继承父类的方法和类变量(注意不是拷贝)

  • Python支持多继承,先继承左边,再继承右边

  • 执行对象.方法时,优先去当前对象所关联的类中找,没有再去它的父类中查找

  • self到底是谁,去self对应的类中去获取成员,没有就按照继承关系向上查找

    class TCPSever:
        def fun(self):
            print("TCPSever")
    
    
    class ThreadingMixIn:
        def fun(self):
            print("ThreadingMixIn")
    
    
    class ThreadingTCPServer(ThreadingMixIn, TCPSever):
        def run(self):
            print("before")
            self.fun()
            print("after")
    
    
    '''
    before
    ThreadingMixIn
    after
    '''
    obj = ThreadingTCPServer()
    obj.fun()
    
3.多态
  • 其他语言多态

    • 接口抽象类抽象方法来实现,让数据可以以多种形态存在(JAVA语言)
    abstract class Animal {  
        abstract void eat();  
    }  
    
    class Cat extends Animal {  
        public void eat() {  
            System.out.println("吃鱼");  
        }  
    }
    
    class Dog extends Animal {  
        public void eat() {  
            System.out.println("吃骨头");  
        }  
        public void work() {  
            System.out.println("看家");  
        }  
    }
    
    
    public class Test {
       public static void main(String[] args) {
           obj1 = Cat()
           show(obj1)
               
    	   obj2 = Dog()
    	   show(obj2)
       }  
        
        public static void show(Animal a)  {
          a.eat()
        }  
    } 
    
  • Python中多态

    • Python中则不一样,由于Python对数据类型没有任何限制,所以天生支持多态

      
      class Email(object):
          def send(self):
              print("发送邮件")
      
      
      class Message(object):
          def send(self):
              print("发送短信")
      
      
      def func(arg):
          v1 = arg.send()
          print(v1)
      
      
      v1 = Email()
      func(v1)
      
      
      v2 = Message()
      func(v2)
      

2.面向对象进阶

1.成员
1.变量
  • 实例变量

    • 属于对象,每个对象中各自维护自己的数据
  • 类变量

    • 属于类,可以被所有对象共享,一般用于给对象提供公共数据(类似于全局变量)

      class Person(object):
          country = "中国"
      
          def __init__(self, name, age):
              self.name = name
              self.age = age
      
          def show(self):
              message = "{}~{}~{}".format(self.country, self.name, self.age)
              print(message)
      
      
      print(Person.country)  # 中国
      
      
      p1 = Person("Ailke", 20)
      print(p1.name)  # Ailke
      print(p1.country)  # 中国
      p1.show()  # 中国~Ailke~20
      
      p2 = Person("Tom", 25)
      p2.country = "美国"
      p2.show()  # 美国~Tom~25
      
      Person.country = "USA"
      p1.show()  # USA~Ailke~20
      
      p2.show()  # 美国~Tom~25
      
2.方法
  • 绑定方法,默认有一个self参数,由对象进行调用

  • 类方法,默认有一个cls参数,用类或对象可以调用

  • 静态方法,无默认参数,用类或对象都可以调用

    class Foo(object):
        def __init__(self, name, age):
            self.name = name
            self.age = age
    
        def show(self):
            print("{}~{}".format(self.name, self.age))
    
        @classmethod
        def func(cls):
            print("类方法", cls)
    
        @staticmethod
        def static_func():
            print("静态方法")
    
    # 绑定对象(方法)
    obj = Foo("Ailke", 20)
    obj.show()  # Ailke~20
    Foo.show(obj)  # # Ailke~20
    
    # 类方法
    Foo.func()  # 类方法 <class '__main__.Foo'>
    obj.func()  # 类方法 <class '__main__.Foo'>
    
    # 静态方法
    Foo.static_func()  # 静态方法
    obj.static_func()  # 静态方法
    
    • 注:
      • Python,方法可以通过对象或类进行调用
      • JAVA、c#等语言中,绑定方法只能由对象调用类方法或静态方法只能由类调用
3.属性
  • 属性其实是由绑定方法 + 特殊装饰器 组合创造出来的,以后调用方法时可以不加括号

    # 属性
    class Foo(object):
        def __init__(self, name, age):
            self.name = name
            self.age = age
    
        def func1(self):
            return 123
    
        @property
        def func2(self):
            return self.name
    
    
    obj = Foo("Ailke", 20)
    print(obj.func1())  # 123
    print(obj.func2)  # Ailke
    
  • 属性的编写有两种方式

    • 基于装饰器

      • @property@<property_name>.setter, @<property_name>.deleter装饰器,被用于定义一个类的属性获取(get)、设置(set)和删除(delete)方法
      • @property装饰器用于将方法转换为只读属性
      class C(object):
      
          @property
          def func1(self):
              pass
      
          @func1.setter
          def func1(self, value):
              pass
      
          @func1.deleter
          def func1(self):
              pass
      
      
      obj = C()
      obj.func1
      obj.func1 = 123
      del obj.func1
      
    • 基于定义变量

      class C(object):
      
          def getx(self):
              pass
      
          def setx(self, value):
              pass
      
          def delx(self):
              pass
      
          x = property(getx, setx, delx, "I'm the 'x' property.")
      
      
      obj = C()
      
      obj.x
      obj.x = 123
      del obj.x
      
2.成员修饰符
  • Python中成员的修饰符是指的是: 公有、私有

    • 公有,在任何地方都可以调用的这个成员

    • 私有,只有在类的内部才可以调用该成员(成员是以两个下划线开头,则表示该成员为私有)

      class Member_modifier(object):
      
          def __init__(self, name, age):
              self.__name = name
              self.age = age
      
          def get_data(self):
              return self.__name
      
          def get_age(self):
              return self.age
      
      # 公有成员
      obj = Member_modifier("Ailke", 20)
      print(obj.age)
      v1 = obj.get_age()
      print(v1)
      
      # 私有成员
      # print(obj.__name)  # error,由于是私有成员,只能在类中进行使用
      v2 = obj.get_data()
      print(v2)  # Ailke
      
      

      注: 父类中的私有成员,子类无法继承

3.对象嵌套
  • 在基于面向对象进行编程时,对象之间可以存在各种各样的关系

    # 对象嵌套
    
    # 学生
    class Student(object):
        def __init__(self, name, age):
            self.name = name
            self.age = age
    
        def message(self):
            data = "我是一名学生,我的名字叫{},我今年{}岁了".format(self.name, self.age)
            print(data)
    
    
    stu1 = Student("张三", 18)
    stu2 = Student("李四", 19)
    stu3 = Student("王五", 20)
    
    
    # 班级
    class Classes(object):
    
        def __init__(self, title):
            self.title = title
            self.students = []
    
        def add_student(self, student):
            self.students.append(student)
    
        def add_students(self, students):
            for stu in students:
                self.add_student(stu)
    
        def show_members(self):
            for item in self.students:
                item.message()
    
    cla = Classes("高三二班")
    cla.add_student(stu1)
    cla.add_students([stu2, stu3])
    
    print(cla.title)
    cla.show_members()
    
4.特殊成员
  • 在Python的类中存在一些特殊的方法,这些方法都是**__方法__**格式,这种方法在内部均有特殊的含义

    • __init__,初始化方法

    • __new__,构造方法

    • __call__

      class CallableClass:
          def __init__(self, value):
              self.value = value
      
          def __call__(self, *args, **kwargs):
              print(f"CallableClass called with {args} and {kwargs}")
              return self.value
      
      # 实例化
      c = CallableClass(10)
      # 调用__call__函数,可以直接调用
      result = c()  # CallableClass called with () and {}
      print(result)
      # 也可以给c()传递参数
      
    • __str__

      class Person:  
          def __init__(self, name, age):  
              self.name = name  
              self.age = age  
        
          def __str__(self):  
              return f"Person(name={self.name}, age={self.age})"  
        
      # 创建一个Person对象  
      person = Person("Alice", 30)  
        
      # 打印对象,这会调用__str__方法  
      print(person)  # 输出: Person(name=Alice, age=30)  
        
      # 将对象转换为字符串并与另一个字符串拼接  
      description = str(person) + " is a nice person."  
      print(description)  # 输出: Person(name=Alice, age=30) is a nice person.
      
    • __dict__

      • 注意点,__dict__不推荐用来修改对象的属性,它可能绕开属性访问控制
      class MyClass:  
          def __init__(self, name, age):  
              self.name = name  
              self.age = age  
        
      # 创建一个MyClass的实例  
      obj = MyClass("Alice", 30)  
        
      # 访问对象的__dict__属性  
      print(obj.__dict__)  # 输出: {'name': 'Alice', 'age': 30}  
        
      # 通过__dict__修改对象的属性  
      obj.__dict__['age'] = 31  
      print(obj.age)  # 输出: 31  
        
      # 也可以向__dict__添加新的属性  
      obj.__dict__['city'] = 'New York'  
      print(obj.city)  # 输出: New York
      
    • __getitem__、__setitem__ 、__delitem__

      class MySequence:  
          def __init__(self, data):  
              self.data = data  
        
          def __getitem__(self, index):  
              return self.data[index]  
        
          def __setitem__(self, index, value):  
              self.data[index] = value  
        
          def __delitem__(self, index):  
              del self.data[index]  
        
      seq = MySequence([1, 2, 3, 4, 5])  
      del seq[2]  
      print(seq[2])  # 输出: 4
      
    • __enter__ 、__exit__

      class MyContextManager:  
          def __enter__(self):  
              print("Entering the context")  
              return self  
        
          def __exit__(self, exc_type, exc_val, exc_tb):  
              print("Exiting the context")  
              if exc_type is not None:  
                  print(f"Caught an exception: {exc_type.__name__}")  
        
      with MyContextManager():  
          print("Inside the with block")  
          raise ValueError("An error occurred")
      
    • __add__

      class Point:  
          def __init__(self, x, y):  
              self.x = x  
              self.y = y  
        
          def __add__(self, other):  
              x = self.x + other.x  
              y = self.y + other.y  
              return Point(x, y)  
        
      p1 = Point(1, 2)  
      p2 = Point(3, 4)  
      p3 = p1 + p2  
      print(p3.x, p3.y)  # 输出: 4, 6
      
    • __iter__

      • 1.当类中定义了 _iter_ 和 __next__ 两个方法。
      • 2.__iter__ 方法需要返回对象本身,即:self
      • 3.__next__ 方法,返回下一个数据,如果没有数据了,则需要抛出一个StopIteration的异常
      class MyIterable:  
          def __init__(self, data):  
              self.data = data  
              self.index = 0  
        
          def __iter__(self):  
              return self  
        
          def __next__(self):  
              if self.index < len(self.data):  
                  result = self.data[self.index]  
                  self.index += 1  
                  return result  
              else:  
                  raise StopIteration  
        
      # 使用自定义的迭代器  
      my_iterable = MyIterable([1, 2, 3, 4, 5])  
        
      # 使用for循环遍历  
      for item in my_iterable:  
          print(item)  
        
      # 或者使用iter和next手动遍历  
      iterator = iter(my_iterable)  
      try:  
          while True:  
              print(next(iterator))  
      except StopIteration:  
          pass
      

3.面向对象高级和应用

1.继承
  • 继承存在意义: 将公共的方法提取到父类中,有利于增加代码重用性

  • 继承编写方式

    # 单继承
    class Base(object):
        pass
    class Foo(Base):
        pass
    
    
    # 多继承
    class Base(object):
        pass
    
    class Bar(object):
        pass
    
    class Foo(Base,Bar):
        pass
    
  • 调用类中的成员时,遵循

    • 优先在自己所在类中找,没有的话则去父类中找
    • 如果类存在多继承(多个父类),则先找左边再找右边
1.mro和c3算法
  • 如果类存在继承关系,可以通过mro()获取当前类的继承关系(成员的顺序)

    class Base(object):
        pass
    
    class Adds(object):
        pass
    
    class Subs(Adds):
        pass
    
    class Muls(Base, Subs):
        pass
    
    print(Muls.mro())  # [<class '__main__.Muls'>, <class '__main__.Base'>, <class '__main__.Subs'>, <class '__main__.Adds'>, <class 'object'>]
    
  • 规则: 从左到右,深度优先,大小钻石,留住顶端

2.py2和py3区别
  • py2经典类,未继承object类型[从左到右,深度优先,大小钻石,不留顶端]
  • py2新式类,直接或间接继承object类型[从左到右,深度优先,大小钻石,留住顶端]
  • py3只保留新式类,默认继承object类型
2.内置函数补充
3.异常处理
1.异常处理的基本格式
try:
    # 逻辑代码
except Exception as e:
    # try中代码如果异常,则此代码块中的代码会执行
try:
    # 逻辑代码
except Exception as e:
    # try中代码如果有异常,则此代码中的代码会执行
finally:
    # try中的代码无论是否报错,finally中的代码都会执行,一般用于释放资源
1.异常细分
import requests
from requests import exceptions

while True:
    url = input("请输入要下载网页地址:")
    try:
        res = requests.get(url=url)
        print(res)    
    except exceptions.MissingSchema as e:
        print("URL架构不存在")
    except exceptions.InvalidSchema as e:
        print("URL架构错误")
    except exceptions.InvalidURL as e:
        print("URL地址格式错误")
    except exceptions.ConnectionError as e:
        print("网络连接错误")
    except Exception as e:
        print("代码出现错误", e)
        
# 提示:如果想要写的简单一点,其实只写一个Exception捕获错误就可以了。
2.Python内置了很多异常类型
常见异常:
"""
AttributeError 试图访问一个对象没有的属性,比如foo.x,但是foo没有属性x
IOError 输入/输出异常;基本上是无法打开文件
ImportError 无法引入模块或包;基本上是路径问题或名称错误
IndentationError 语法错误(的子类) ;代码没有正确对齐
IndexError 下标索引超出序列边界,比如当x只有三个元素,却试图访问n x[5]
KeyError 试图访问字典里不存在的键 inf['xx']
KeyboardInterrupt Ctrl+C被按下
NameError 使用一个还未被赋予对象的变量
SyntaxError Python代码非法,代码不能编译(个人认为这是语法错误,写错了)
TypeError 传入对象类型与要求的不符合
UnboundLocalError 试图访问一个还未被设置的局部变量,基本上是由于另有一个同名的全局变量,
导致你以为正在访问它
ValueError 传入一个调用者不期望的值,即使值的类型是正确的
"""
更多异常:
"""
ArithmeticError
AssertionError
AttributeError
BaseException
BufferError
BytesWarning
DeprecationWarning
EnvironmentError
EOFError
Exception
FloatingPointError
FutureWarning
GeneratorExit
ImportError
ImportWarning
IndentationError
IndexError
IOError
KeyboardInterrupt
KeyError
LookupError
MemoryError
NameError
NotImplementedError
OSError
OverflowError
PendingDeprecationWarning
ReferenceError
RuntimeError
RuntimeWarning
StandardError
StopIteration
SyntaxError
SyntaxWarning
SystemError
SystemExit
TabError
TypeError
UnboundLocalError
UnicodeDecodeError
UnicodeEncodeError
UnicodeError
UnicodeTranslateError
UnicodeWarning
UserWarning
ValueError
Warning
ZeroDivisionError
"""
2.自定义异常&抛出异常
  • 自定义的异常,想要触发,需要使用 raise 异常类型() 实现

    class EmailValidError(Exception):
        title = "邮箱格式错误"
    
    class ContentRequiredError(Exception):
        title = "文本不能为空错误"
        
    def send_email(email,content):
        if not re.match("\w+@live.com",email):
            raise EmailValidError()
    	if len(content) == 0 :
            raise ContentRequiredError()
    	# 发送邮件代码...
        # ...
    
    def execute():
        # 其他代码
        # ...
        
    	try:
            send_email(...)
        except EmailValidError as e:
            pass
        except ContentRequiredError as e:
            pass
        except Exception as e:
            print("发送失败")
    
    execute()
    
    # 提示:如果想要写的简单一点,其实只写一个Exception捕获错误就可以了。
    
3.特殊的finally
  • 在try或except中即便使用了return,也会执行最后的finally块中的代码
4.反射
  • 反射,提供了一种更加灵活的方式让你可以实现去 对象 中操作成员

  • Python中提供了4个内置函数来支持反射

    • getattr

      • 去对象中获取成员

        v1 = getattr(对象, "成员名称")
        v2 = getattr(对象, "成员名称", 不存在时的默认值)
        
    • setattr

      • 去对象中设置成员

        setattr(对象, "成员名称",)
        
    • hasattr

      • 对象中是否包含成员

        v1 = hasattr(对象, "成员名称")
        
    • delattr

      • 删除对象中的成员

        delattr(对象, "成员名称")
        

2.网络

1.socket

1.简单服务器链接
  • Python中内置了一个socket模块,可以快速实现网络之间进行传输数据

  • 服务端,放在云服务器中(有固定IP)

    import socket
    
    # 创建socket对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # ip和端口
    s.bind(('127.0.0.1', 9898))
    # 支持等待数量
    s.listen(5)
    
    while True:
        # 等待,直到有客户端连接
        conn, addr = s.accept()
        # 接收数据
        data = conn.recv(1024)
        print(data.decode('utf-8'))
        # 发送数据
        conn.send("hello".encode('utf-8'))
        # 关闭连接
        conn.close()
    
    # 停止服务端程序
    s.close()
    
  • 客户端,放在用户电脑

    import socket
    
    # 向指定IP发送链接请求
    client = socket.socket()
    client.connect(('127.0.0.1', 9898))
    
    # 链接成功后发送消息
    client.send("Hello".encode('utf-8'))
    
    # 等待服务器端消息回复
    reply = client.recv(1024)
    print(reply.encode('utf-8'))
    
    # 关闭链接
    
    client.close()
    
2.文件上传
  • 服务端

    # server.py  
    import socket  
    import os  
      
    # 设置IP地址和端口号  
    SERVER_IP = '128.0.2.6'  
    SERVER_PORT = 12345  
    BUFFER_SIZE = 4096  
      
    # 创建socket对象  
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
      
    # 绑定IP地址和端口号  
    server_socket.bind((SERVER_IP, SERVER_PORT))  
      
    # 开始监听连接  
    server_socket.listen(1)  
    print(f"Server is listening on {SERVER_IP}:{SERVER_PORT}...")  
      
    while True:  
        # 接受客户端连接  
        client_socket, client_address = server_socket.accept()  
        print(f"Accepted connection from {client_address}")  
      
        # 接收文件名  
        filename = client_socket.recv(BUFFER_SIZE).decode()  
        if not filename:  
            break  
      
        # 创建文件以写入数据  
        with open(filename, 'wb') as f:  
            while True:  
                # 接收文件数据  
                data = client_socket.recv(BUFFER_SIZE)  
                if not data:  
                    break  
                # 写入文件  
                f.write(data)  
          
        # 关闭客户端连接  
        client_socket.close()  
        print(f"File {filename} received from {client_address}")  
      
    # 关闭服务器socket  
    server_socket.close()
    
  • 客户端

    # client.py  
    import socket  
    import os  
      
    # 设置服务器IP地址和端口号  
    SERVER_IP = '128.0.2.6'  
    SERVER_PORT = 12345  
    BUFFER_SIZE = 4096  
      
    # 创建socket对象  
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
      
    # 连接到服务器  
    client_socket.connect((SERVER_IP, SERVER_PORT))  
      
    # 选择要上传的文件  
    file_path = input("请输入要上传的文件路径: ")  
    if not os.path.isfile(file_path):  
        print("文件不存在,请检查路径是否正确。")  
        client_socket.close()  
        exit()  
      
    # 发送文件名给服务器  
    filename = os.path.basename(file_path)  
    client_socket.sendall(filename.encode())  
      
    # 打开文件并发送数据  
    with open(file_path, 'rb') as f:  
        for data in iter(lambda: f.read(BUFFER_SIZE), b''):  
            client_socket.sendall(data)  
      
    # 关闭客户端socket  
    client_socket.close()  
    print(f"文件 {filename} 已上传到服务器。")
    

2.OSI 7层模型

  • 应用层: 规定数据的格式
  • 表示层: 对应用层数据的编码、解/压缩、分块、加/解密等任务
  • 会话层: 负责与目标建立、中断连接
  • 传输层: 建立端口到端口的通信,其实就确定双方端口信息
  • 网络层: 标记目标IP信息(IP协议)
  • 数据链路层: 对数据进行分组并设置源和目标mac地址
  • 物理层: 将二进制数据在物理层媒介上传输

3.UDP和TCP

  • UDP,用户数据报协议,是一个无连接的简单的面向数据报的传输协议。UDP不提供可靠性,它只是把应用程序传给IP层的数据报发送出去,但是并不能保证它们能到达目的地;由于UDP在传输数据报前不用在客户和服务器之间建立一个链接,且没有超市重发机制,故传输速度很快

    • 服务端
    import socket
    
    server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server.bind(('127.0.0.1', 8002))
    
    while True:
        data, (host, port) = server.recvfrom(1024) # 阻塞
        print(data, host, port)
        server.sendto("好的".encode('utf-8'), (host, port))
    
    • 客户端

      import socket
      
      client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      while True:
          text = input("请输入要发送的内容:")
          if text.upper() == 'Q':
              break
          client.sendto(text.encode('utf-8'), ('127.0.0.1', 8002))
          data, (host, port) = client.recvfrom(1024)
          print(data.decode('utf-8'))
      
      client.close()
      
  • TCP,是面向链接的协议,在收发数据前,必须和对方建立可靠性链接,再进行收发数据

1.TCP三次握手和四次挥手
  • 基于TCP连接进行通信
    • 创建链接,客户端和服务端要进行三次握手
      • 第一次握手:客户端发送一个带有SYN标志的连接请求数据包给服务端,表示请求连接,并等待服务器的确认。此时,客户端进入SYN_SEND状态。
      • 第二次握手:服务端收到SYN包后,会确认客户的SYN(通过发送ACK=j+1),并同时发送一个带有SYN标志的数据包(SYN=k),即SYN+ACK包,表示确认客户端的请求并请求建立连接。此时,服务器进入SYN_RECV状态。
      • 第三次握手:客户端收到服务器的SYN+ACK包后,向服务器发送一个带有ACK标志的应答数据包(ACK=k+1),表示确认服务器的请求,并通知服务器客户端已经准备好发送和接收数据。一旦这个包发送完毕,客户端和服务器就进入了ESTABLISHED状态,表示TCP连接已经成功建立。
    • 传输数据
    • 关闭链接,客户端和服务端要进行4次挥手
      • 第一次挥手:主动关闭方(例如客户端)发送一个FIN报文,用来关闭主动方到被动关闭方(例如服务端)的数据传送。这表示主动关闭方已经没有数据要发送给被动关闭方了,但此时主动关闭方还可以接受数据。
      • 第二次挥手:被动关闭方收到FIN报文后,发送一个ACK报文给主动关闭方,确认序号为收到序号加1,表示收到了关闭请求。
      • 第三次挥手:被动关闭方发送一个FIN报文,用来关闭被动关闭方到主动关闭方的数据传送,表示被动关闭方的数据也发送完了,不会再发送数据。
      • 第四次挥手:主动关闭方收到FIN报文后,回复一个ACK报文,并进入TIME_WAIT状态,等待2MSL(最长报文段寿命)时间后关闭连接。被动关闭方收到ACK报文后,进入CLOSED状态,连接完全关闭。

4.粘包

  • 对于发送者,执行 sendall/send 发送消息时,是将数据先发送至自己网卡的 写缓冲区 ,再由缓冲区将数据发送给到对方网卡的读缓冲区。
  • 对于接受者,执行 recv 接收消息时,是从自己网卡的读缓冲区获取数据
1.粘包问题
  • 每次发送消息时,将消息划分为头部(固定长度) 和 数据两部分
  • 头部需要一个数字并固定4个字节,可以借助python的struct包来实现
2.消息&文件上传
  • 服务端

    import os
    import json
    import socket
    import struct
    
    
    def recv_data(conn, chunk_size=1024):
        # 获取头部信息:数据长度
        has_read_size = 0
        bytes_list = []
        while has_read_size < 4:
            chunk = conn.recv(4 - has_read_size)
            has_read_size += len(chunk)
            bytes_list.append(chunk)
        header = b"".join(bytes_list)
        data_length = struct.unpack('i', header)[0]
    
        # 获取数据
        data_list = []
        has_read_data_size = 0
        while has_read_data_size < data_length:
            size = chunk_size if (data_length - has_read_data_size) > chunk_size else data_length - has_read_data_size
            chunk = conn.recv(size)
            data_list.append(chunk)
            has_read_data_size += len(chunk)
    
        data = b"".join(data_list)
    
        return data
    
    
    def recv_file(conn, save_file_name, chunk_size=1024):
        save_file_path = os.path.join('files', save_file_name)
        # 获取头部信息:数据长度
        has_read_size = 0
        bytes_list = []
        while has_read_size < 4:
            chunk = conn.recv(4 - has_read_size)
            bytes_list.append(chunk)
            has_read_size += len(chunk)
        header = b"".join(bytes_list)
        data_length = struct.unpack('i', header)[0]
    
        # 获取数据
        file_object = open(save_file_path, mode='wb')
        has_read_data_size = 0
        while has_read_data_size < data_length:
            size = chunk_size if (data_length - has_read_data_size) > chunk_size else data_length - has_read_data_size
            chunk = conn.recv(size)
            file_object.write(chunk)
            file_object.flush()
            has_read_data_size += len(chunk)
        file_object.close()
    
    
    def run():
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # IP可复用
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
        sock.bind(('127.0.0.1', 8001))
        sock.listen(5)
        while True:
            conn, addr = sock.accept()
    
            while True:
                # 获取消息类型
                message_type = recv_data(conn).decode('utf-8')
                if message_type == 'close':  # 四次挥手,空内容。
                    print("关闭连接")
                    break
                # 文件:{'msg_type':'file', 'file_name':"xxxx.xx" }
                # 消息:{'msg_type':'msg'}
                message_type_info = json.loads(message_type)
                if message_type_info['msg_type'] == 'msg':
                    data = recv_data(conn)
                    print("接收到消息:", data.decode('utf-8'))
                else:
                    file_name = message_type_info['file_name']
                    print("接收到文件,要保存到:", file_name)
                    recv_file(conn, file_name)
    
            conn.close()
        sock.close()
    
    
    if __name__ == '__main__':
        run()
    
  • 客户端

    import os
    import json
    import socket
    import struct
    
    
    def send_data(conn, content):
        data = content.encode('utf-8')
        header = struct.pack('i', len(data))
        conn.sendall(header)
        conn.sendall(data)
    
    
    def send_file(conn, file_path):
        file_size = os.stat(file_path).st_size
        header = struct.pack('i', file_size)
        conn.sendall(header)
    
        has_send_size = 0
        file_object = open(file_path, mode='rb')
        while has_send_size < file_size:
            chunk = file_object.read(2048)
            conn.sendall(chunk)
            has_send_size += len(chunk)
        file_object.close()
    
    
    def run():
        client = socket.socket()
        client.connect(('127.0.0.1', 8001))
    
        while True:
            """
            请发送消息,格式为:
                - 消息:msg|你好呀
                - 文件:file|xxxx.png
            """
            content = input(">>>")  # msg or file
            if content.upper() == 'Q':
                send_data(client, "close")
                break
            input_text_list = content.split('|')
            if len(input_text_list) != 2:
                print("格式错误,请重新输入")
                continue
    
            message_type, info = input_text_list
    
            # 发消息
            if message_type == 'msg':
    
                # 发消息类型
                send_data(client, json.dumps({"msg_type": "msg"}))
    
                # 发内容
                send_data(client, info)
    
            # 发文件
            else:
                file_name = info.rsplit(os.sep, maxsplit=1)[-1]
    
                # 发消息类型
                send_data(client, json.dumps({"msg_type": "file", 'file_name': file_name}))
    
                # 发内容
                send_file(client, info)
    
        client.close()
    
    
    if __name__ == '__main__':
        run()
    

3.并发编程

1.进程和线程

  • 线程,是计算机中可以被cpu调度得最小单元
  • 进程,是计算机资源分配的最小单位(进程为线程提供资源)
  • 一个进程中可以有多个线程,同一个进程中的线程可以共享此进程中的资源
1.多线程
import threading
import time

import requests

url_list = [
    ("东北F4模仿秀.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0300f570000bvbmace0gvch7lo53oog"),
    ("卡特扣篮.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f3e0000bv52fpn5t6p007e34q1g"),
    ("罗斯mvp.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f240000buuer5aa4tij4gv6ajqg"),
    ("B站.mp4", "https://www.bilibili.com/video/BV1h1421m7pL/?spm_id_from=333.1007.tianma.1-1-1.click&vd_source=f7031923c3818d4ee69e34c6533c5356")
]


def task(file_name, video_url):
    res = requests.get(video_url)
    with open(file_name, "wb") as f:
        f.write(res.content)
    print(time.time())


for name, url in url_list:
    # 创建线程,让每个线程都去执行task函数(参数不同)
    t = threading.Thread(target=task, args=(name, url))
    t.start()
2.多进程
import multiprocessing
import time

import requests

url_list = [
    ("东北F4模仿秀.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0300f570000bvbmace0gvch7lo53oog"),
    ("卡特扣篮.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f3e0000bv52fpn5t6p007e34q1g"),
    ("罗斯mvp.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f240000buuer5aa4tij4gv6ajqg"),
    ("B站.mp4", "https://www.bilibili.com/video/BV1h1421m7pL/?spm_id_from=333.1007.tianma.1-1-1.click&vd_source=f7031923c3818d4ee69e34c6533c5356")
]


def task(file_name, video_url):
    res = requests.get(video_url)
    with open(file_name, "wb") as f:
        f.write(res.content)
    print(time.time())


if __name__ == "__main__":
    print(time.time())
    for name, url in url_list:
        t = multiprocessing.Process(target=task, args=(name, url))
        t.start()
  • 注: 多进程的开销比多线程的开销大
3.GIL锁
  • GIL,全局解释器锁(Global Interpreter Lock), CPython解释器特特有的,让一个进程中,同一个时刻只能有一个线程可以被CPU调用
  • 常见的程序开发中,计算操作需要使用CPU多核优势,IO操作不需要利用CPU的多核优势
    • 计算密集型,用多进程
      • 大量的数据计算等
    • IO密集型,用多线程
      • 文件读写、网络数据传输等
1.串行处理

import time

# 开始时间
start_time = time.time()

result = 0
for i in range(1000000000):
    result += i
print(result)

# 结束时间
end_time = time.time()

print("程序运行时间:", end_time - start_time)  # 79s
2.多进程处理
def task(start, end, queue):
    result = 0
    for i in range(start, end):
        result += i
    queue.put(result)


if __name__ == '__main__':
    queue = multiprocessing.Queue()
    start_time = time.time()
    p1 = multiprocessing.Process(target=task, args=(0, 500000000, queue))
    p1.start()

    p2 = multiprocessing.Process(target=task, args=(500000000, 1000000000, queue))
    p2.start()

    v1 = queue.get(block=True)
    v2 = queue.get(block=True)
    print(v1 + v2)

    end_time = time.time()

    print("程序运行时间:", end_time - start_time)  # 程序运行时间: 21s
3.多进程+多线程
  • 多线程和多进程是可以结合使用,建议进程数量等于当前CPU的物理核心数
import multiprocessing
import threading


def thread_task():
    pass


def task(start, end):
    t1 = threading.Thread(target=thread_task)
    t1.start()

    t2 = threading.Thread(target=thread_task)
    t2.start()

    t3 = threading.Thread(target=thread_task)
    t3.start()


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task, args=(0, 50000000))
    p1.start()

    p2 = multiprocessing.Process(target=task, args=(50000000, 100000000))
    p2.start()

2.多线程开发

  • 基本格式

    import threading
    
    def task(arg):
    	pass
    
    
    # 创建一个Thread对象(线程),并封装线程被CPU调度时应该执行的任务和相关参数。
    t = threading.Thread(target=task,args=('xxx',))
    # 线程准备就绪(等待CPU调度),代码继续向下执行。
    t.start()
    
    print("继续执行...") # 主线程执行完所有代码,不结束(等待子线程)
    
1.线程常见的方法
  • t.start(), 当前线程准备就绪(等待CPU调度,具体时间是由CPU来决定的)

    import threading
    
    loop = 10000000
    number = 0
    
    
    def _add(count):
        global number
        for i in range(count):
            number += 1
    
    
    t = threading.Thread(target=_add, args=(loop,))
    t.start()
    
    print(number)  # 不固定数值
    
  • t.join(),等待当前线程的任务执行王弼后,再向下继续执行

    import threading
    
    loop = 10000000
    number = 0
    
    
    def _add(count):
        global number
        for i in range(count):
            number += 1
    
    
    t = threading.Thread(target=_add, args=(loop,))
    t.start()
    
    # 主线程等待中
    t.join()
    
    print(number)  # 10000000
    
  • t.setDaemon(布尔值),守护线程(必须放在start之前)------启用咯

    • 直接用线程对象.daemon设置布尔值即可

    • True,设置为守护线程,主线程执行完毕后,子线程也自动关闭

    • False,设置为非守护线程,主线程等待子线程,子线程执行完毕之后,主线程才结束(默认)

      import threading
      import time
      
      
      def task(arg):
          time.sleep(5)
          print("任务执行结束")
      
      
      t = threading.Thread(target=task, args=(11,))
      # python3.9 以上版本 弃用了setDaemon函数
      # t.setDaemon(True)
      '''
      thread.daemon的参数设置
          True:当线程结束时,程序会自动结束
          False:当线程结束时,程序不会自动结束
          
          子线程设置等待时间,主线程不回等待子线程执行结束,程序终止
      '''
      t.daemon = True
      t.start()
      
      print('程序执行结束')
      
  • 线程名称的设置和获取

    import threading
    import time
    
    
    def task(arg):
        time.sleep(5)
        name = threading.current_thread().name
        print("当前线程的名字:" + name)
        print("任务执行结束")
    
    
    t = threading.Thread(target=task, args=(11,))
    # python3.9 以上版本 弃用了setDaemon函数
    # t.setDaemon(True)
    '''
    thread.daemon的参数设置
        True:当线程结束时,程序会自动结束
        False:当线程结束时,程序不会自动结束
        
        子线程设置等待时间,主线程不回等待子线程执行结束,程序终止
    '''
    # 设置线程名称
    t.name = '子线程'
    # 设置线程名称的方法弃用
    # t.setName('子线程')
    t.daemon = False
    t.start()
    
    print('程序执行结束')
    
  • 自定义线程类,直接将线程需要做的事写到run方法中

    import threading
    
    import requests
    
    
    class DouYinThread(threading.Thread):
        def run(self):
            file_name, video_url = self._args
            res = requests.get(video_url)
            with open(file_name, 'wb') as f:
                f.write(res.content)
    
    
    url_list = [
        ("东北F4模仿秀.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0300f570000bvbmace0gvch7lo53oog"),
        ("卡特扣篮.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f3e0000bv52fpn5t6p007e34q1g"),
        ("罗斯mvp.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f240000buuer5aa4tij4gv6ajqg")
    ]
    
    for item in url_list:
        t = DouYinThread(args=(item[0], item[1]))
        t.start()
    

3.线程安全

  • 一个进程可以有多个线程,且线程共享所有进程中的资源

  • 多个线程同时去操作一个"资源",可能会存在数据混乱的情况

    import threading
    
    loop = 10000000
    number = 0
    
    lock_object = threading.RLock()
    
    for i in range(1000):
        def _add(count):
            # 加锁
            lock_object.acquire()
            global number
            for i in range(count):
                number += 1
            # 解锁
            lock_object.release()
    
    
        def _sub(count):
            # 加锁
            lock_object.acquire()
            global number
            for i in range(count):
                number -= 1
            # 解锁
            lock_object.release()
    
    
        t1 = threading.Thread(target=_add, args=(loop,))
        t2 = threading.Thread(target=_sub, args=(loop,))
        t1.start()
        t2.start()
    
        t1.join()  # t1线程执行完毕,才继续往后走
        t2.join()  # t2线程执行完毕,才继续往后走
    
        print(number)
    
    import threading
    
    num = 0
    lock_object = threading.RLock()
    
    
    def task():
        global num
        for i in range(1000000):
            num += 1
        print(num)
    
    # def task():
    #     print("开始")
    #     # 加锁
    #     lock_object.acquire()
    #     global num
    #     for i in range(1000000):
    #         num += 1
    #     # 解锁
    #     lock_object.release()
    #     print(num)
    
    
    def task():
        print("开始")
        # 基于上下文管理,内部自动调用acquire和release
        with lock_object:
            global num
            for i in range(1000000):
                num += 1
            '''
                开始
                开始
                1000000
                开始
                开始
                开始
                2000000
                开始
                开始开始
                
                3000000
                4000000
                5000000
                6000000
                7000000
                8000000
            '''
            print(num)
    
    
    for i in range(8):
        t = threading.Thread(target=task)
        t.start()
    

4.线程锁

  • 开发中手动加锁,一般有两种: LockRLock
1.Lock,同步锁
import threading

num = 0
# Lock 同步锁
lock_object = threading.Lock()
# RLock 递归锁
# lock_object = threading.RLock()


def task():
    print("开始")
    # 基于上下文管理,内部自动调用acquire和release
    with lock_object:
        global num
        for i in range(1000000):
            num += 1
        print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()
2.RLock,递归锁
import threading

num = 0
# Lock 同步锁
# lock_object = threading.Lock()
# RLock 递归锁
lock_object = threading.RLock()


def task():
    print("开始")
    # 基于上下文管理,内部自动调用acquire和release
    with lock_object:
        global num
        for i in range(1000000):
            num += 1
        print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()

3.RLock 和 Lock的区别
  • RLock支持多次申请锁和多次释放锁Lock不支持多次

5.死锁

  • 死锁,由于竞争资源或者彼此通信而造成的一种阻塞的现象

    import threading
    
    
    class DeadlockDemo:
        def __init__(self):
            self.lock1 = threading.Lock()
            self.lock2 = threading.Lock()
            self.resource1 = "Resource 1"
            self.resource2 = "Resource 2"
    
        def thread_function_1(self):
            self.lock1.acquire()
            print(f"Thread 1 acquired {self.resource1}")
    
            # 模拟一些工作
            # ...
    
            print(f"Thread 1 is trying to acquire {self.resource2}")
            self.lock2.acquire()
            print(f"Thread 1 acquired {self.resource2}")
    
            # 释放锁
            self.lock2.release()
            self.lock1.release()
    
        def thread_function_2(self):
            self.lock2.acquire()
            print(f"Thread 2 acquired {self.resource2}")
    
            # 模拟一些工作
            # ...
    
            print(f"Thread 2 is trying to acquire {self.resource1}")
            self.lock1.acquire()
            print(f"Thread 2 acquired {self.resource1}")
    
            # 释放锁
            self.lock1.release()
            self.lock2.release()
    
    
    if __name__ == "__main__":
        demo = DeadlockDemo()
    
        # 创建并启动线程
        thread1 = threading.Thread(target=demo.thread_function_1)
        thread2 = threading.Thread(target=demo.thread_function_2)
        thread1.start()
        thread2.start()
    
        # 等待线程结束
        thread1.join()
        thread2.join()
    

6.线程池

  • Python3中官方才正式提供线程池

  • 线程不是开的越多越好,开的多了可能会导致系统的性能更低了

  • 不建议

    • 无限制的创建线程

      import threading
      
      
      def task(video_url):
          pass
      
      
      url_list = ["www.xxxx-{}.com".format(i) for i in range(30000)]
      
      # 每次创建一个线程去操作,创建任务太多,线程就会特别多,可能效率反倒降低
      for url in url_list:
          t = threading.Thread(target=task, args=(url,))
          t.start()
      
  • 建议

    • 使用线程池

      import random
      import time
      from concurrent.futures import ThreadPoolExecutor
      
      
      def task(video_url):
          print("开始执行任务", video_url)
          time.sleep(2)
          return random.randint(0, 10)
      
      
      def done(response):
          print("任务执行完毕", response.result())
      
      
      # 创建线程池,最多维护10个线程
      pool = ThreadPoolExecutor(10)
      
      url_list = ["www.xxxx-{}.com".format(x) for x in range(300)]
      
      for url in url_list:
          # 在线程池提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后,再讲线程交换给线程池;如果没有空闲线程,则等待
          result = pool.submit(task, url)
          # 等待任务执行完毕
          result.add_done_callback(done)
      
      print("END")
      

7.单例模式

  • 单例模式,每次实例化类的对象时,都是最开始创建的那个对象,不再重复创建对象

    # 简单单例模式
    import threading
    import time
    
    
    class Singleton:
        instance = None
        lock = threading.RLock()
    
        def __init__(self, name):
            self.name = name
    
        # def __new__(cls, *args, **kwargs):
        #     if cls.instance:
        #         return cls.instance
        #     cls.instance = object.__new__(cls)
        #     return cls.instance
    
        def __new__(cls, *args, **kwargs):
            if cls.instance:
                return cls.instance
            with cls.lock:
                if cls.instance:
                    return cls.instance
                time.sleep(0.1)
                cls.instance = object.__new__(cls)
                return cls.instance
    
    
    def task():
        obj = Singleton('Tom')
        print(obj.name)
    
    
    for i in range(10):
        t = threading.Thread(target=task)
        t.start()
    
    
    data = Singleton("李四")
    print(data.name)
    
    # obj1 = Singleton('张三')
    # obj2 = Singleton('李四')
    
    # print(obj1.name, obj2.name)  # 李四 李四
    

8.多进程开发

  • 进程时计算机中资源分配的最小单元,一个进程中可以有多个线程,同一个进程中的线程共享资源
  • 进程和进程之间则是相互隔离
  • Python中通过多进程可以利用CPU的多核优势,计算密集型操作适用于多进程
1.进程介绍
import multiprocessing


def task(arg):
    pass


def run():
    p = multiprocessing.Process(target=task, args=(1,))
    p.start()


if __name__ == '__main__':
    run()

2.进程常见功能
  • 和线程基本类似

9.进程间数据的共享

  • 进程时资源分配的最小单元,每个进程中都维护自己独立的数据,不共享

    import multiprocessing
    
    
    def task(data):
        data.append(666)
    
    
    if __name__ == '__main__':
        data_list = []
        p = multiprocessing.Process(target=task, args=(data_list,))
        p.start()
        p.join()
    
        print("主进程:", data_list)  # 主进程: []
    
1.共享
1.Shared memory
  • Data can be stored in a shared memory map using Value or Array

        'c': ctypes.c_char,  'u': ctypes.c_wchar,
        'b': ctypes.c_byte,  'B': ctypes.c_ubyte, 
        'h': ctypes.c_short, 'H': ctypes.c_ushort,
        'i': ctypes.c_int,   'I': ctypes.c_uint,  (其u表示无符号)
        'l': ctypes.c_long,  'L': ctypes.c_ulong, 
        'f': ctypes.c_float, 'd': ctypes.c_double
    
    from multiprocessing import Process, Value, Array
    
    
    def func(num, m1, m2):
        num.value = 888
        m1.value = 'a'.encode("utf-8")
        m2.value = "梁"
    
    
    def f(data_array):
        data_array[0] = 666
    
    
    if __name__ == '__main__':
        num = Value('i', 666)
        m1 = Value('c')
        m2 = Value('u')
    
        arr = Array('i', [11, 58, 96])
    
        p = Process(target=func, args=(num, m1, m2))
        p.start()
        p.join()
    
        print(num.value)  # 888
        print(m1.value)  # b 'a'
        print(m2.value)  # 梁
    
        print(arr[:])  # [11, 58, 96]
    
    
2.Server process
  • A manager object returned by Manager() controls a server process which holds Python and allows other process to manipulate them using proxies

    from multiprocessing import Process, Manager
    
    
    def func(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.append(666)
    
    
    if __name__ == '__main__':
        with Manager() as mgr:
            d = mgr.dict()
            l = mgr.list()
    
            p = Process(target=func, args=(d, l))
            p.start()
            p.join()
    
            print(d)  # {1: '1', '2': 2, 0.25: None}
            print(l)  # [666]
    
2.交换

multiprocessing支持进程之间的两种通信通道

1.队列
  • Queue类是一个近似queue.Queue的克隆

    from multiprocessing import Process, Queue
    
    def f(q):
        q.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    
    • 队列线程进程安全
2.管道
  • **Pipe()**函数返回一个由管道链接的链接对象,默认情况下是双工的

    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())   # prints "[42, None, 'hello']"
        p.join()
    
    • 每个连接对象都有send() 和 **recv()**方法
    • 两个线程(或进程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏
    • 不同进程中同时使用管道的不同端的情况下不存在损坏的风险

10.进程锁

  • 多个进程抢占式去做某些操作时候,为了防止操作出问题,可以通过进程锁来避免。

    import time
    import multiprocessing
    
    def task(lock):
        print("开始")
        lock.acquire()
        # 假设文件中保存的内容就是一个值:10
        with open('f1.txt', mode='r', encoding='utf-8') as f:
            current_num = int(f.read())
    
        print("排队抢票了")
        time.sleep(1)
        current_num -= 1
    
        with open('f1.txt', mode='w', encoding='utf-8') as f:
            f.write(str(current_num))
        lock.release()
    
    
    if __name__ == '__main__':
        multiprocessing.set_start_method('fork')
        lock = multiprocessing.RLock()
        for i in range(10):
            p = multiprocessing.Process(target=task, args=(lock,))
            p.start()
    

11.进程池

  • 注意:如果在进程池中要使用进程锁,则需要基于Manager中的Lock和RLock来实现

    import os
    import time
    from concurrent.futures import ProcessPoolExecutor
    
    
    def task(file_name):
        ip_set = set()
        total_count = 0
        ip_count = 0
        file_path = os.path.join("files", file_name)
        file_object = open(file_path, mode='r', encoding='utf-8')
        for line in file_object:
            if not line.strip():
                continue
            user_ip = line.split(" - -", maxsplit=1)[0].split(",")[0]
            total_count += 1
            if user_ip in ip_set:
                continue
            ip_count += 1
            ip_set.add(user_ip)
        time.sleep(1)
        return {"total": total_count, 'ip': ip_count}
    
    
    def outer(info, file_name):
        def done(res, *args, **kwargs):
            info[file_name] = res.result()
    
        return done
    
    
    def run():
        # 根据目录读取文件并初始化字典
        """
            1.读取目录下所有的文件,每个进程处理一个文件。
        """
        info = {}
        
        pool = ProcessPoolExecutor(4)
    
        for file_name in os.listdir("files"):
            fur = pool.submit(task, file_name)
            fur.add_done_callback(  outer(info, file_name)  ) # 回调函数:主进程
    
        pool.shutdown(True)
        for k, v in info.items():
            print(k, v)
    
    
    if __name__ == '__main__':
        run()
    

12.协程

  • Python的协程通常基于 生成器async/await 语法来实现
1.使用生成器实现协程
  • 在 Python 2.5 引入了生成器(Generator),Python 3.5 及之后的版本支持生成器的协程特性,主要利用 yield 关键字来实现协程功能

    def coroutine():
        while True:
            received = yield
            print("Received:", received)
    
    # 创建协程对象
    coro = coroutine()
    
    # 启动协程(即初始化生成器)
    next(coro)
    
    # 发送数据给协程
    coro.send("Hello")
    coro.send("World")
    
    
2.使用async/await语法实现的协程
  • Python 3.5 引入了 asyncawait 关键字,用于定义异步函数和进行协程操作,这种方式更加简洁和直观

    import asyncio
    
    async def coroutine():
        while True:
            received = await asyncio.sleep(1)  # 模拟异步操作
            print("Received:", received)
    
    # 创建事件循环
    loop = asyncio.get_event_loop()
    
    # 启动协程任务
    task = loop.create_task(coroutine())
    
    # 运行事件循环
    loop.run_forever()
    
    
3.主要特点和用途
  • 非抢占式调度:协程可以在特定的点上主动释放控制权,让出执行权给其他协程,而不是像线程那样由操作系统进行抢占式调度。
  • 提高并发性能:协程可以减少线程/进程的切换开销,提高并发处理能力。
  • 适用于异步 I/O:协程常用于异步 I/O 操作,如网络通信、文件读写等,能有效提高程序的响应速度和吞吐量。
举报

相关推荐

0 条评论