0
点赞
收藏
分享

微信扫一扫

基于Python和Flask框架的新能源汽车可视化大屏系统

以下是一个基于Python和Flask框架的新能源汽车可视化大屏系统后台代码示例。这个系统提供API接口用于前端大屏展示新能源汽车相关数据。

  1. 主应用文件 (app.py) python from flask import Flask, jsonify, request from flask_cors import CORS import random from datetime import datetime, timedelta import time import threading import sqlite3 from collections import defaultdict

app = Flask(name) CORS(app) # 允许跨域请求

模拟数据库

DATABASE = 'new_energy_vehicles.db'

def init_db(): conn = sqlite3.connect(DATABASE) c = conn.cursor()

# 创建车辆信息表
c.execute('''CREATE TABLE IF NOT EXISTS vehicles
             (id INTEGER PRIMARY KEY AUTOINCREMENT,
              vin TEXT UNIQUE,
              brand TEXT,
              model TEXT,
              battery_capacity REAL,
              range INTEGER,
              production_date TEXT,
              status TEXT)''')

# 创建充电记录表
c.execute('''CREATE TABLE IF NOT EXISTS charging_records
             (id INTEGER PRIMARY KEY AUTOINCREMENT,
              vin TEXT,
              station_id TEXT,
              start_time TEXT,
              end_time TEXT,
              energy_consumed REAL,
              cost REAL)''')

# 创建行驶记录表
c.execute('''CREATE TABLE IF NOT EXISTS driving_records
             (id INTEGER PRIMARY KEY AUTOINCREMENT,
              vin TEXT,
              start_time TEXT,
              end_time TEXT,
              distance REAL,
              energy_consumed REAL,
              avg_speed REAL)''')

# 创建充电站表
c.execute('''CREATE TABLE IF NOT EXISTS charging_stations
             (id INTEGER PRIMARY KEY AUTOINCREMENT,
              station_id TEXT UNIQUE,
              name TEXT,
              location TEXT,
              total_ports INTEGER,
              available_ports INTEGER,
              status TEXT)''')

conn.commit()
conn.close()

初始化数据库

init_db()

模拟数据生成器

class DataGenerator: @staticmethod def generate_vehicle_data(): brands = ['特斯拉', '比亚迪', '蔚来', '小鹏', '理想', '广汽埃安', '极氪', '哪吒'] models = { '特斯拉': ['Model 3', 'Model Y', 'Model S', 'Model X'], '比亚迪': ['汉EV', '唐EV', '秦PLUS EV', '海豚'], '蔚来': ['ES6', 'ES8', 'ET7', 'ET5'], '小鹏': ['P7', 'G3', 'P5', 'G9'], '理想': ['L9', 'L8', 'L7', 'ONE'], '广汽埃安': ['LX', 'V', 'Y', 'S'], '极氪': ['001', '009'], '哪吒': ['U', 'V', 'S'] }

    brand = random.choice(brands)
    model = random.choice(models[brand])
    battery_capacity = round(random.uniform(30, 100), 1)
    vehicle_range = int(battery_capacity * random.uniform(5, 7))
    
    return {
        'vin': f'LV{random.randint(1000000000000000, 9999999999999999)}',
        'brand': brand,
        'model': model,
        'battery_capacity': battery_capacity,
        'range': vehicle_range,
        'production_date': f'{random.randint(2018, 2023)}-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}',
        'status': random.choice(['行驶中', '充电中', '空闲', '维修中'])
    }

@staticmethod
def generate_charging_station_data():
    locations = ['北京', '上海', '广州', '深圳', '杭州', '成都', '重庆', '武汉']
    prefixes = ['国家电网', '特来电', '星星充电', '南方电网', '特斯拉超充']
    
    return {
        'station_id': f'CS{random.randint(100000, 999999)}',
        'name': f'{random.choice(prefixes)}{random.choice(["中心站", "快充站", "超级充电站"])}',
        'location': f'{random.choice(locations)}{random.choice(["市", ""])}{random.choice(["朝阳区", "浦东新区", "天河区", "南山区", "余杭区", "高新区"])}',
        'total_ports': random.randint(4, 20),
        'available_ports': random.randint(0, 4),
        'status': random.choice(['运营中', '建设中', '维护中'])
    }

初始化一些模拟数据

def init_sample_data(): conn = sqlite3.connect(DATABASE) c = conn.cursor()

# 检查是否有数据,没有则插入
c.execute("SELECT COUNT(*) FROM vehicles")
if c.fetchone()[0] == 0:
    for _ in range(100):
        vehicle = DataGenerator.generate_vehicle_data()
        c.execute("INSERT INTO vehicles (vin, brand, model, battery_capacity, range, production_date, status) VALUES (?, ?, ?, ?, ?, ?, ?)",
                  (vehicle['vin'], vehicle['brand'], vehicle['model'], vehicle['battery_capacity'], 
                   vehicle['range'], vehicle['production_date'], vehicle['status']))

c.execute("SELECT COUNT(*) FROM charging_stations")
if c.fetchone()[0] == 0:
    for _ in range(20):
        station = DataGenerator.generate_charging_station_data()
        c.execute("INSERT INTO charging_stations (station_id, name, location, total_ports, available_ports, status) VALUES (?, ?, ?, ?, ?, ?)",
                  (station['station_id'], station['name'], station['location'], 
                   station['total_ports'], station['available_ports'], station['status']))

conn.commit()
conn.close()

init_sample_data()

实时数据更新线程

class RealTimeDataUpdater(threading.Thread): def init(self): super().init() self.daemon = True self.running = True

def run(self):
    while self.running:
        try:
            conn = sqlite3.connect(DATABASE)
            c = conn.cursor()
            
            # 更新车辆状态
            c.execute("SELECT vin FROM vehicles")
            vins = [row[0] for row in c.fetchall()]
            
            for vin in vins:
                new_status = random.choice(['行驶中', '充电中', '空闲', '维修中'])
                c.execute("UPDATE vehicles SET status=? WHERE vin=?", (new_status, vin))
                
                # 随机生成行驶或充电记录
                if new_status == '行驶中':
                    start_time = datetime.now() - timedelta(minutes=random.randint(1, 120))
                    end_time = start_time + timedelta(minutes=random.randint(10, 180))
                    distance = round(random.uniform(5, 200), 1)
                    energy_consumed = round(distance * random.uniform(0.12, 0.18), 1)
                    avg_speed = random.randint(30, 90)
                    
                    c.execute("INSERT INTO driving_records (vin, start_time, end_time, distance, energy_consumed, avg_speed) VALUES (?, ?, ?, ?, ?, ?)",
                              (vin, start_time.isoformat(), end_time.isoformat(), distance, energy_consumed, avg_speed))
                
                elif new_status == '充电中':
                    c.execute("SELECT station_id FROM charging_stations WHERE status='运营中' AND available_ports>0 ORDER BY RANDOM() LIMIT 1")
                    station = c.fetchone()
                    if station:
                        station_id = station[0]
                        start_time = datetime.now() - timedelta(minutes=random.randint(1, 60))
                        end_time = start_time + timedelta(minutes=random.randint(30, 180))
                        energy_consumed = round(random.uniform(10, 60), 1)
                        cost = round(energy_consumed * random.uniform(1.2, 2.5), 2)
                        
                        c.execute("INSERT INTO charging_records (vin, station_id, start_time, end_time, energy_consumed, cost) VALUES (?, ?, ?, ?, ?, ?)",
                                  (vin, station_id, start_time.isoformat(), end_time.isoformat(), energy_consumed, cost))
                        
                        # 减少可用充电端口
                        c.execute("UPDATE charging_stations SET available_ports=available_ports-1 WHERE station_id=?", (station_id,))
            
            # 更新充电站可用端口
            c.execute("SELECT station_id FROM charging_stations WHERE status='运营中'")
            stations = [row[0] for row in c.fetchall()]
            
            for station_id in stations:
                c.execute("SELECT available_ports, total_ports FROM charging_stations WHERE station_id=?", (station_id,))
                available, total = c.fetchone()
                
                # 随机增加或减少可用端口
                change = random.randint(-2, 2)
                new_available = max(0, min(total, available + change))
                
                if new_available != available:
                    c.execute("UPDATE charging_stations SET available_ports=? WHERE station_id=?", (new_available, station_id))
            
            conn.commit()
            conn.close()
            
            # 每分钟更新一次
            time.sleep(60)
        
        except Exception as e:
            print(f"Error in real-time data updater: {e}")
            time.sleep(10)

def stop(self):
    self.running = False

启动实时数据更新线程

data_updater = RealTimeDataUpdater() data_updater.start()

API路由

@app.route('/api/dashboard/summary', methods=['GET']) def get_dashboard_summary(): conn = sqlite3.connect(DATABASE) c = conn.cursor()

# 车辆总数
c.execute("SELECT COUNT(*) FROM vehicles")
total_vehicles = c.fetchone()[0]

# 按品牌统计
c.execute("SELECT brand, COUNT(*) FROM vehicles GROUP BY brand")
brands = {row[0]: row[1] for row in c.fetchall()}

# 按状态统计
c.execute("SELECT status, COUNT(*) FROM vehicles GROUP BY status")
statuses = {row[0]: row[1] for row in c.fetchall()}

# 充电站统计
c.execute("SELECT COUNT(*) FROM charging_stations WHERE status='运营中'")
operating_stations = c.fetchone()[0]

c.execute("SELECT SUM(total_ports) FROM charging_stations WHERE status='运营中'")
total_ports = c.fetchone()[0] or 0

c.execute("SELECT SUM(available_ports) FROM charging_stations WHERE status='运营中'")
available_ports = c.fetchone()[0] or 0

# 今日充电量
today = datetime.now().date().isoformat()
c.execute("SELECT SUM(energy_consumed) FROM charging_records WHERE date(start_time)=?", (today,))
today_charging = c.fetchone()[0] or 0

# 今日行驶里程
c.execute("SELECT SUM(distance) FROM driving_records WHERE date(start_time)=?", (today,))
today_distance = c.fetchone()[0] or 0

conn.close()

return jsonify({
    'total_vehicles': total_vehicles,
    'brand_distribution': brands,
    'status_distribution': statuses,
    'charging_stations': {
        'total': operating_stations,
        'ports': {
            'total': total_ports,
            'available': available_ports,
            'utilization': round((total_ports - available_ports) / total_ports * 100, 1) if total_ports > 0 else 0
        }
    },
    'today_stats': {
        'charging_energy': round(today_charging, 1),
        'distance': round(today_distance, 1)
    },
    'timestamp': datetime.now().isoformat()
})

@app.route('/api/vehicles/list', methods=['GET']) def get_vehicle_list(): page = int(request.args.get('page', 1)) page_size = int(request.args.get('page_size', 10)) offset = (page - 1) * page_size

conn = sqlite3.connect(DATABASE)
c = conn.cursor()

c.execute("SELECT * FROM vehicles LIMIT ? OFFSET ?", (page_size, offset))
vehicles = []
for row in c.fetchall():
    vehicles.append({
        'id': row[0],
        'vin': row[1],
        'brand': row[2],
        'model': row[3],
        'battery_capacity': row[4],
        'range': row[5],
        'production_date': row[6],
        'status': row[7]
    })

c.execute("SELECT COUNT(*) FROM vehicles")
total = c.fetchone()[0]

conn.close()

return jsonify({
    'data': vehicles,
    'pagination': {
        'page': page,
        'page_size': page_size,
        'total': total,
        'total_pages': (total + page_size - 1) // page_size
    }
})

@app.route('/api/charging/stations', methods=['GET']) def get_charging_stations(): conn = sqlite3.connect(DATABASE) c = conn.cursor()

c.execute("SELECT * FROM charging_stations")
stations = []
for row in c.fetchall():
    stations.append({
        'id': row[0],
        'station_id': row[1],
        'name': row[2],
        'location': row[3],
        'total_ports': row[4],
        'available_ports': row[5],
        'status': row[6]
    })

conn.close()

return jsonify(stations)

@app.route('/api/charging/records', methods=['GET']) def get_charging_records(): days = int(request.args.get('days', 7)) start_date = (datetime.now() - timedelta(days=days)).date().isoformat()

conn = sqlite3.connect(DATABASE)
c = conn.cursor()

# 按天统计充电量
c.execute('''SELECT date(start_time) as day, SUM(energy_consumed), COUNT(*)
             FROM charging_records 
             WHERE date(start_time) >= ?
             GROUP BY day
             ORDER BY day''', (start_date,))

daily_stats = []
for row in c.fetchall():
    daily_stats.append({
        'date': row[0],
        'energy': round(row[1], 1),
        'sessions': row[2]
    })

# 按充电站统计
c.execute('''SELECT s.name, COUNT(*) as sessions, SUM(c.energy_consumed) as energy
             FROM charging_records c
             JOIN charging_stations s ON c.station_id = s.station_id
             WHERE date(c.start_time) >= ?
             GROUP BY s.name
             ORDER BY energy DESC
             LIMIT 10''', (start_date,))

station_stats = []
for row in c.fetchall():
    station_stats.append({
        'station': row[0],
        'sessions': row[1],
        'energy': round(row[2], 1)
    })

conn.close()

return jsonify({
    'daily_stats': daily_stats,
    'station_stats': station_stats
})

@app.route('/api/driving/records', methods=['GET']) def get_driving_records(): days = int(request.args.get('days', 7)) start_date = (datetime.now() - timedelta(days=days)).date().isoformat()

conn = sqlite3.connect(DATABASE)
c = conn.cursor()

# 按天统计行驶里程
c.execute('''SELECT date(start_time) as day, SUM(distance), SUM(energy_consumed), COUNT(*)
             FROM driving_records 
             WHERE date(start_time) >= ?
             GROUP BY day
             ORDER BY day''', (start_date,))

daily_stats = []
for row in c.fetchall():
    daily_stats.append({
        'date': row[0],
        'distance': round(row[1], 1),
        'energy': round(row[2], 1),
        'trips': row[3],
        'efficiency': round(row[1] / row[2], 1) if row[2] > 0 else 0
    })

# 按品牌统计平均效率
c.execute('''SELECT v.brand, AVG(d.distance/d.energy_consumed) as avg_efficiency
             FROM driving_records d
             JOIN vehicles v ON d.vin = v.vin
             WHERE date(d.start_time) >= ?
             GROUP BY v.brand
             ORDER BY avg_efficiency DESC''', (start_date,))

brand_efficiency = []
for row in c.fetchall():
    brand_efficiency.append({
        'brand': row[0],
        'efficiency': round(row[1], 1)
    })

conn.close()

return jsonify({
    'daily_stats': daily_stats,
    'brand_efficiency': brand_efficiency
})

@app.route('/api/map/vehicles', methods=['GET']) def get_map_vehicles(): # 模拟车辆位置数据 vehicles = [] conn = sqlite3.connect(DATABASE) c = conn.cursor()

c.execute("SELECT vin, brand, model, status FROM vehicles WHERE status IN ('行驶中', '充电中') LIMIT 50")
for row in c.fetchall():
    # 生成随机位置(中国范围内)
    lat = round(random.uniform(21.0, 42.0), 6)
    lng = round(random.uniform(98.0, 122.0), 6)
    
    vehicles.append({
        'vin': row[0],
        'brand': row[1],
        'model': row[2],
        'status': row[3],
        'location': [lat, lng],
        'battery': random.randint(10, 90)
    })

conn.close()

return jsonify(vehicles)

@app.route('/api/map/stations', methods=['GET']) def get_map_stations(): # 获取充电站位置数据 stations = [] conn = sqlite3.connect(DATABASE) c = conn.cursor()

c.execute("SELECT station_id, name, location, available_ports, total_ports FROM charging_stations WHERE status='运营中'")
for row in c.fetchall():
    # 从位置信息中提取坐标(模拟)
    # 实际应用中应该存储真实的经纬度
    lat = round(random.uniform(21.0, 42.0), 6)
    lng = round(random.uniform(98.0, 122.0), 6)
    
    stations.append({
        'id': row[0],
        'name': row[1],
        'location': row[2],
        'coordinates': [lat, lng],
        'available_ports': row[3],
        'total_ports': row[4],
        'utilization': round((row[4] - row[3]) / row[4] * 100, 1) if row[4] > 0 else 0
    })

conn.close()

return jsonify(stations)

if name == 'main': app.run(host='0.0.0.0', port=5000, debug=True) 2. 数据库初始化脚本 (init_db.py) python import sqlite3 from app import DATABASE, DataGenerator

def init_database(): conn = sqlite3.connect(DATABASE) c = conn.cursor()

# 删除现有表
c.execute("DROP TABLE IF EXISTS vehicles")
c.execute("DROP TABLE IF EXISTS charging_records")
c.execute("DROP TABLE IF EXISTS driving_records")
c.execute("DROP TABLE IF EXISTS charging_stations")

# 重新创建表
c.execute('''CREATE TABLE vehicles
             (id INTEGER PRIMARY KEY AUTOINCREMENT,
              vin TEXT UNIQUE,
              brand TEXT,
              model TEXT,
              battery_capacity REAL,
              range INTEGER,
              production_date TEXT,
              status TEXT)''')

c.execute('''CREATE TABLE charging_records
             (id INTEGER PRIMARY KEY AUTOINCREMENT,
              vin TEXT,
              station_id TEXT,
              start_time TEXT,
              end_time TEXT,
              energy_consumed REAL,
              cost REAL)''')

c.execute('''CREATE TABLE driving_records
             (id INTEGER PRIMARY KEY AUTOINCREMENT,
              vin TEXT,
              start_time TEXT,
              end_time TEXT,
              distance REAL,
              energy_consumed REAL,
              avg_speed REAL)''')

c.execute('''CREATE TABLE charging_stations
             (id INTEGER PRIMARY KEY AUTOINCREMENT,
              station_id TEXT UNIQUE,
              name TEXT,
              location TEXT,
              total_ports INTEGER,
              available_ports INTEGER,
              status TEXT)''')

# 插入模拟数据
for _ in range(100):
    vehicle = DataGenerator.generate_vehicle_data()
    c.execute("INSERT INTO vehicles (vin, brand, model, battery_capacity, range, production_date, status) VALUES (?, ?, ?, ?, ?, ?, ?)",
              (vehicle['vin'], vehicle['brand'], vehicle['model'], vehicle['battery_capacity'], 
               vehicle['range'], vehicle['production_date'], vehicle['status']))

for _ in range(20):
    station = DataGenerator.generate_charging_station_data()
    c.execute("INSERT INTO charging_stations (station_id, name, location, total_ports, available_ports, status) VALUES (?, ?, ?, ?, ?, ?)",
              (station['station_id'], station['name'], station['location'], 
               station['total_ports'], station['available_ports'], station['status']))

conn.commit()
conn.close()
print("数据库初始化完成!")

if name == 'main': init_database() 3. 配置文件 (config.py) python import os

basedir = os.path.abspath(os.path.dirname(file))

class Config: # 基础配置 SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-key-123' DATABASE = os.path.join(basedir, 'new_energy_vehicles.db')

# API配置
API_PREFIX = '/api'
API_VERSION = 'v1'

# 数据更新间隔(秒)
DATA_UPDATE_INTERVAL = 60

# 日志配置
LOG_FILE = 'nevs.log'
LOG_LEVEL = 'INFO'

@staticmethod
def init_app(app):
    pass

class DevelopmentConfig(Config): DEBUG = True LOG_LEVEL = 'DEBUG'

class ProductionConfig(Config): DEBUG = False

config = { 'development': DevelopmentConfig, 'production': ProductionConfig, 'default': DevelopmentConfig } 4. 前端API调用示例 javascript // 使用axios调用API示例

// 1. 获取仪表板摘要数据 axios.get('/api/dashboard/summary') .then(response => { console.log('仪表板数据:', response.data); // 处理数据... }) .catch(error => { console.error('获取仪表板数据失败:', error); });

// 2. 获取车辆列表 axios.get('/api/vehicles/list', { params: { page: 1, page_size: 10 } }) .then(response => { console.log('车辆列表:', response.data); // 处理数据... });

// 3. 获取充电记录统计 axios.get('/api/charging/records', { params: { days: 30 } }) .then(response => { console.log('充电记录:', response.data); // 处理数据... });

// 4. 获取地图数据 Promise.all([ axios.get('/api/map/vehicles'), axios.get('/api/map/stations') ]) .then(([vehiclesRes, stationsRes]) => { console.log('地图车辆数据:', vehiclesRes.data); console.log('地图充电站数据:', stationsRes.data); // 处理数据... }); 系统功能说明 数据概览仪表板:

显示车辆总数、品牌分布、状态分布

充电站统计信息

今日充电量和行驶里程

车辆管理:

车辆列表分页查询

车辆状态实时更新

充电管理:

充电站信息查询

充电记录统计

充电站利用率分析

行驶数据分析:

行驶里程统计

能耗效率分析

品牌能耗对比

地图可视化:

车辆实时位置显示

充电站位置及状态显示

部署说明 安装依赖:

pip install flask flask-cors sqlite3 初始化数据库:

python init_db.py 启动服务:

python app.py 访问API:

仪表板数据: http://localhost:5000/api/dashboard/summary

车辆列表: http://localhost:5000/api/vehicles/list

充电站地图数据: http://localhost:5000/api/map/stations

这个系统提供了新能源汽车监控和分析所需的基本API接口,前端可以通过这些接口获取数据并实现可视化大屏展示。

举报

相关推荐

可视化大屏

0 条评论