以下是一个基于Python和Flask框架的新能源汽车可视化大屏系统后台代码示例。这个系统提供API接口用于前端大屏展示新能源汽车相关数据。
- 主应用文件 (app.py) python from flask import Flask, jsonify, request from flask_cors import CORS import random from datetime import datetime, timedelta import time import threading import sqlite3 from collections import defaultdict
app = Flask(name) CORS(app) # 允许跨域请求
模拟数据库
DATABASE = 'new_energy_vehicles.db'
def init_db(): conn = sqlite3.connect(DATABASE) c = conn.cursor()
# 创建车辆信息表
c.execute('''CREATE TABLE IF NOT EXISTS vehicles
(id INTEGER PRIMARY KEY AUTOINCREMENT,
vin TEXT UNIQUE,
brand TEXT,
model TEXT,
battery_capacity REAL,
range INTEGER,
production_date TEXT,
status TEXT)''')
# 创建充电记录表
c.execute('''CREATE TABLE IF NOT EXISTS charging_records
(id INTEGER PRIMARY KEY AUTOINCREMENT,
vin TEXT,
station_id TEXT,
start_time TEXT,
end_time TEXT,
energy_consumed REAL,
cost REAL)''')
# 创建行驶记录表
c.execute('''CREATE TABLE IF NOT EXISTS driving_records
(id INTEGER PRIMARY KEY AUTOINCREMENT,
vin TEXT,
start_time TEXT,
end_time TEXT,
distance REAL,
energy_consumed REAL,
avg_speed REAL)''')
# 创建充电站表
c.execute('''CREATE TABLE IF NOT EXISTS charging_stations
(id INTEGER PRIMARY KEY AUTOINCREMENT,
station_id TEXT UNIQUE,
name TEXT,
location TEXT,
total_ports INTEGER,
available_ports INTEGER,
status TEXT)''')
conn.commit()
conn.close()
初始化数据库
init_db()
模拟数据生成器
class DataGenerator: @staticmethod def generate_vehicle_data(): brands = ['特斯拉', '比亚迪', '蔚来', '小鹏', '理想', '广汽埃安', '极氪', '哪吒'] models = { '特斯拉': ['Model 3', 'Model Y', 'Model S', 'Model X'], '比亚迪': ['汉EV', '唐EV', '秦PLUS EV', '海豚'], '蔚来': ['ES6', 'ES8', 'ET7', 'ET5'], '小鹏': ['P7', 'G3', 'P5', 'G9'], '理想': ['L9', 'L8', 'L7', 'ONE'], '广汽埃安': ['LX', 'V', 'Y', 'S'], '极氪': ['001', '009'], '哪吒': ['U', 'V', 'S'] }
brand = random.choice(brands)
model = random.choice(models[brand])
battery_capacity = round(random.uniform(30, 100), 1)
vehicle_range = int(battery_capacity * random.uniform(5, 7))
return {
'vin': f'LV{random.randint(1000000000000000, 9999999999999999)}',
'brand': brand,
'model': model,
'battery_capacity': battery_capacity,
'range': vehicle_range,
'production_date': f'{random.randint(2018, 2023)}-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}',
'status': random.choice(['行驶中', '充电中', '空闲', '维修中'])
}
@staticmethod
def generate_charging_station_data():
locations = ['北京', '上海', '广州', '深圳', '杭州', '成都', '重庆', '武汉']
prefixes = ['国家电网', '特来电', '星星充电', '南方电网', '特斯拉超充']
return {
'station_id': f'CS{random.randint(100000, 999999)}',
'name': f'{random.choice(prefixes)}{random.choice(["中心站", "快充站", "超级充电站"])}',
'location': f'{random.choice(locations)}{random.choice(["市", ""])}{random.choice(["朝阳区", "浦东新区", "天河区", "南山区", "余杭区", "高新区"])}',
'total_ports': random.randint(4, 20),
'available_ports': random.randint(0, 4),
'status': random.choice(['运营中', '建设中', '维护中'])
}
初始化一些模拟数据
def init_sample_data(): conn = sqlite3.connect(DATABASE) c = conn.cursor()
# 检查是否有数据,没有则插入
c.execute("SELECT COUNT(*) FROM vehicles")
if c.fetchone()[0] == 0:
for _ in range(100):
vehicle = DataGenerator.generate_vehicle_data()
c.execute("INSERT INTO vehicles (vin, brand, model, battery_capacity, range, production_date, status) VALUES (?, ?, ?, ?, ?, ?, ?)",
(vehicle['vin'], vehicle['brand'], vehicle['model'], vehicle['battery_capacity'],
vehicle['range'], vehicle['production_date'], vehicle['status']))
c.execute("SELECT COUNT(*) FROM charging_stations")
if c.fetchone()[0] == 0:
for _ in range(20):
station = DataGenerator.generate_charging_station_data()
c.execute("INSERT INTO charging_stations (station_id, name, location, total_ports, available_ports, status) VALUES (?, ?, ?, ?, ?, ?)",
(station['station_id'], station['name'], station['location'],
station['total_ports'], station['available_ports'], station['status']))
conn.commit()
conn.close()
init_sample_data()
实时数据更新线程
class RealTimeDataUpdater(threading.Thread): def init(self): super().init() self.daemon = True self.running = True
def run(self):
while self.running:
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
# 更新车辆状态
c.execute("SELECT vin FROM vehicles")
vins = [row[0] for row in c.fetchall()]
for vin in vins:
new_status = random.choice(['行驶中', '充电中', '空闲', '维修中'])
c.execute("UPDATE vehicles SET status=? WHERE vin=?", (new_status, vin))
# 随机生成行驶或充电记录
if new_status == '行驶中':
start_time = datetime.now() - timedelta(minutes=random.randint(1, 120))
end_time = start_time + timedelta(minutes=random.randint(10, 180))
distance = round(random.uniform(5, 200), 1)
energy_consumed = round(distance * random.uniform(0.12, 0.18), 1)
avg_speed = random.randint(30, 90)
c.execute("INSERT INTO driving_records (vin, start_time, end_time, distance, energy_consumed, avg_speed) VALUES (?, ?, ?, ?, ?, ?)",
(vin, start_time.isoformat(), end_time.isoformat(), distance, energy_consumed, avg_speed))
elif new_status == '充电中':
c.execute("SELECT station_id FROM charging_stations WHERE status='运营中' AND available_ports>0 ORDER BY RANDOM() LIMIT 1")
station = c.fetchone()
if station:
station_id = station[0]
start_time = datetime.now() - timedelta(minutes=random.randint(1, 60))
end_time = start_time + timedelta(minutes=random.randint(30, 180))
energy_consumed = round(random.uniform(10, 60), 1)
cost = round(energy_consumed * random.uniform(1.2, 2.5), 2)
c.execute("INSERT INTO charging_records (vin, station_id, start_time, end_time, energy_consumed, cost) VALUES (?, ?, ?, ?, ?, ?)",
(vin, station_id, start_time.isoformat(), end_time.isoformat(), energy_consumed, cost))
# 减少可用充电端口
c.execute("UPDATE charging_stations SET available_ports=available_ports-1 WHERE station_id=?", (station_id,))
# 更新充电站可用端口
c.execute("SELECT station_id FROM charging_stations WHERE status='运营中'")
stations = [row[0] for row in c.fetchall()]
for station_id in stations:
c.execute("SELECT available_ports, total_ports FROM charging_stations WHERE station_id=?", (station_id,))
available, total = c.fetchone()
# 随机增加或减少可用端口
change = random.randint(-2, 2)
new_available = max(0, min(total, available + change))
if new_available != available:
c.execute("UPDATE charging_stations SET available_ports=? WHERE station_id=?", (new_available, station_id))
conn.commit()
conn.close()
# 每分钟更新一次
time.sleep(60)
except Exception as e:
print(f"Error in real-time data updater: {e}")
time.sleep(10)
def stop(self):
self.running = False
启动实时数据更新线程
data_updater = RealTimeDataUpdater() data_updater.start()
API路由
@app.route('/api/dashboard/summary', methods=['GET']) def get_dashboard_summary(): conn = sqlite3.connect(DATABASE) c = conn.cursor()
# 车辆总数
c.execute("SELECT COUNT(*) FROM vehicles")
total_vehicles = c.fetchone()[0]
# 按品牌统计
c.execute("SELECT brand, COUNT(*) FROM vehicles GROUP BY brand")
brands = {row[0]: row[1] for row in c.fetchall()}
# 按状态统计
c.execute("SELECT status, COUNT(*) FROM vehicles GROUP BY status")
statuses = {row[0]: row[1] for row in c.fetchall()}
# 充电站统计
c.execute("SELECT COUNT(*) FROM charging_stations WHERE status='运营中'")
operating_stations = c.fetchone()[0]
c.execute("SELECT SUM(total_ports) FROM charging_stations WHERE status='运营中'")
total_ports = c.fetchone()[0] or 0
c.execute("SELECT SUM(available_ports) FROM charging_stations WHERE status='运营中'")
available_ports = c.fetchone()[0] or 0
# 今日充电量
today = datetime.now().date().isoformat()
c.execute("SELECT SUM(energy_consumed) FROM charging_records WHERE date(start_time)=?", (today,))
today_charging = c.fetchone()[0] or 0
# 今日行驶里程
c.execute("SELECT SUM(distance) FROM driving_records WHERE date(start_time)=?", (today,))
today_distance = c.fetchone()[0] or 0
conn.close()
return jsonify({
'total_vehicles': total_vehicles,
'brand_distribution': brands,
'status_distribution': statuses,
'charging_stations': {
'total': operating_stations,
'ports': {
'total': total_ports,
'available': available_ports,
'utilization': round((total_ports - available_ports) / total_ports * 100, 1) if total_ports > 0 else 0
}
},
'today_stats': {
'charging_energy': round(today_charging, 1),
'distance': round(today_distance, 1)
},
'timestamp': datetime.now().isoformat()
})
@app.route('/api/vehicles/list', methods=['GET']) def get_vehicle_list(): page = int(request.args.get('page', 1)) page_size = int(request.args.get('page_size', 10)) offset = (page - 1) * page_size
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute("SELECT * FROM vehicles LIMIT ? OFFSET ?", (page_size, offset))
vehicles = []
for row in c.fetchall():
vehicles.append({
'id': row[0],
'vin': row[1],
'brand': row[2],
'model': row[3],
'battery_capacity': row[4],
'range': row[5],
'production_date': row[6],
'status': row[7]
})
c.execute("SELECT COUNT(*) FROM vehicles")
total = c.fetchone()[0]
conn.close()
return jsonify({
'data': vehicles,
'pagination': {
'page': page,
'page_size': page_size,
'total': total,
'total_pages': (total + page_size - 1) // page_size
}
})
@app.route('/api/charging/stations', methods=['GET']) def get_charging_stations(): conn = sqlite3.connect(DATABASE) c = conn.cursor()
c.execute("SELECT * FROM charging_stations")
stations = []
for row in c.fetchall():
stations.append({
'id': row[0],
'station_id': row[1],
'name': row[2],
'location': row[3],
'total_ports': row[4],
'available_ports': row[5],
'status': row[6]
})
conn.close()
return jsonify(stations)
@app.route('/api/charging/records', methods=['GET']) def get_charging_records(): days = int(request.args.get('days', 7)) start_date = (datetime.now() - timedelta(days=days)).date().isoformat()
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
# 按天统计充电量
c.execute('''SELECT date(start_time) as day, SUM(energy_consumed), COUNT(*)
FROM charging_records
WHERE date(start_time) >= ?
GROUP BY day
ORDER BY day''', (start_date,))
daily_stats = []
for row in c.fetchall():
daily_stats.append({
'date': row[0],
'energy': round(row[1], 1),
'sessions': row[2]
})
# 按充电站统计
c.execute('''SELECT s.name, COUNT(*) as sessions, SUM(c.energy_consumed) as energy
FROM charging_records c
JOIN charging_stations s ON c.station_id = s.station_id
WHERE date(c.start_time) >= ?
GROUP BY s.name
ORDER BY energy DESC
LIMIT 10''', (start_date,))
station_stats = []
for row in c.fetchall():
station_stats.append({
'station': row[0],
'sessions': row[1],
'energy': round(row[2], 1)
})
conn.close()
return jsonify({
'daily_stats': daily_stats,
'station_stats': station_stats
})
@app.route('/api/driving/records', methods=['GET']) def get_driving_records(): days = int(request.args.get('days', 7)) start_date = (datetime.now() - timedelta(days=days)).date().isoformat()
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
# 按天统计行驶里程
c.execute('''SELECT date(start_time) as day, SUM(distance), SUM(energy_consumed), COUNT(*)
FROM driving_records
WHERE date(start_time) >= ?
GROUP BY day
ORDER BY day''', (start_date,))
daily_stats = []
for row in c.fetchall():
daily_stats.append({
'date': row[0],
'distance': round(row[1], 1),
'energy': round(row[2], 1),
'trips': row[3],
'efficiency': round(row[1] / row[2], 1) if row[2] > 0 else 0
})
# 按品牌统计平均效率
c.execute('''SELECT v.brand, AVG(d.distance/d.energy_consumed) as avg_efficiency
FROM driving_records d
JOIN vehicles v ON d.vin = v.vin
WHERE date(d.start_time) >= ?
GROUP BY v.brand
ORDER BY avg_efficiency DESC''', (start_date,))
brand_efficiency = []
for row in c.fetchall():
brand_efficiency.append({
'brand': row[0],
'efficiency': round(row[1], 1)
})
conn.close()
return jsonify({
'daily_stats': daily_stats,
'brand_efficiency': brand_efficiency
})
@app.route('/api/map/vehicles', methods=['GET']) def get_map_vehicles(): # 模拟车辆位置数据 vehicles = [] conn = sqlite3.connect(DATABASE) c = conn.cursor()
c.execute("SELECT vin, brand, model, status FROM vehicles WHERE status IN ('行驶中', '充电中') LIMIT 50")
for row in c.fetchall():
# 生成随机位置(中国范围内)
lat = round(random.uniform(21.0, 42.0), 6)
lng = round(random.uniform(98.0, 122.0), 6)
vehicles.append({
'vin': row[0],
'brand': row[1],
'model': row[2],
'status': row[3],
'location': [lat, lng],
'battery': random.randint(10, 90)
})
conn.close()
return jsonify(vehicles)
@app.route('/api/map/stations', methods=['GET']) def get_map_stations(): # 获取充电站位置数据 stations = [] conn = sqlite3.connect(DATABASE) c = conn.cursor()
c.execute("SELECT station_id, name, location, available_ports, total_ports FROM charging_stations WHERE status='运营中'")
for row in c.fetchall():
# 从位置信息中提取坐标(模拟)
# 实际应用中应该存储真实的经纬度
lat = round(random.uniform(21.0, 42.0), 6)
lng = round(random.uniform(98.0, 122.0), 6)
stations.append({
'id': row[0],
'name': row[1],
'location': row[2],
'coordinates': [lat, lng],
'available_ports': row[3],
'total_ports': row[4],
'utilization': round((row[4] - row[3]) / row[4] * 100, 1) if row[4] > 0 else 0
})
conn.close()
return jsonify(stations)
if name == 'main': app.run(host='0.0.0.0', port=5000, debug=True) 2. 数据库初始化脚本 (init_db.py) python import sqlite3 from app import DATABASE, DataGenerator
def init_database(): conn = sqlite3.connect(DATABASE) c = conn.cursor()
# 删除现有表
c.execute("DROP TABLE IF EXISTS vehicles")
c.execute("DROP TABLE IF EXISTS charging_records")
c.execute("DROP TABLE IF EXISTS driving_records")
c.execute("DROP TABLE IF EXISTS charging_stations")
# 重新创建表
c.execute('''CREATE TABLE vehicles
(id INTEGER PRIMARY KEY AUTOINCREMENT,
vin TEXT UNIQUE,
brand TEXT,
model TEXT,
battery_capacity REAL,
range INTEGER,
production_date TEXT,
status TEXT)''')
c.execute('''CREATE TABLE charging_records
(id INTEGER PRIMARY KEY AUTOINCREMENT,
vin TEXT,
station_id TEXT,
start_time TEXT,
end_time TEXT,
energy_consumed REAL,
cost REAL)''')
c.execute('''CREATE TABLE driving_records
(id INTEGER PRIMARY KEY AUTOINCREMENT,
vin TEXT,
start_time TEXT,
end_time TEXT,
distance REAL,
energy_consumed REAL,
avg_speed REAL)''')
c.execute('''CREATE TABLE charging_stations
(id INTEGER PRIMARY KEY AUTOINCREMENT,
station_id TEXT UNIQUE,
name TEXT,
location TEXT,
total_ports INTEGER,
available_ports INTEGER,
status TEXT)''')
# 插入模拟数据
for _ in range(100):
vehicle = DataGenerator.generate_vehicle_data()
c.execute("INSERT INTO vehicles (vin, brand, model, battery_capacity, range, production_date, status) VALUES (?, ?, ?, ?, ?, ?, ?)",
(vehicle['vin'], vehicle['brand'], vehicle['model'], vehicle['battery_capacity'],
vehicle['range'], vehicle['production_date'], vehicle['status']))
for _ in range(20):
station = DataGenerator.generate_charging_station_data()
c.execute("INSERT INTO charging_stations (station_id, name, location, total_ports, available_ports, status) VALUES (?, ?, ?, ?, ?, ?)",
(station['station_id'], station['name'], station['location'],
station['total_ports'], station['available_ports'], station['status']))
conn.commit()
conn.close()
print("数据库初始化完成!")
if name == 'main': init_database() 3. 配置文件 (config.py) python import os
basedir = os.path.abspath(os.path.dirname(file))
class Config: # 基础配置 SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-key-123' DATABASE = os.path.join(basedir, 'new_energy_vehicles.db')
# API配置
API_PREFIX = '/api'
API_VERSION = 'v1'
# 数据更新间隔(秒)
DATA_UPDATE_INTERVAL = 60
# 日志配置
LOG_FILE = 'nevs.log'
LOG_LEVEL = 'INFO'
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config): DEBUG = True LOG_LEVEL = 'DEBUG'
class ProductionConfig(Config): DEBUG = False
config = { 'development': DevelopmentConfig, 'production': ProductionConfig, 'default': DevelopmentConfig } 4. 前端API调用示例 javascript // 使用axios调用API示例
// 1. 获取仪表板摘要数据 axios.get('/api/dashboard/summary') .then(response => { console.log('仪表板数据:', response.data); // 处理数据... }) .catch(error => { console.error('获取仪表板数据失败:', error); });
// 2. 获取车辆列表 axios.get('/api/vehicles/list', { params: { page: 1, page_size: 10 } }) .then(response => { console.log('车辆列表:', response.data); // 处理数据... });
// 3. 获取充电记录统计 axios.get('/api/charging/records', { params: { days: 30 } }) .then(response => { console.log('充电记录:', response.data); // 处理数据... });
// 4. 获取地图数据 Promise.all([ axios.get('/api/map/vehicles'), axios.get('/api/map/stations') ]) .then(([vehiclesRes, stationsRes]) => { console.log('地图车辆数据:', vehiclesRes.data); console.log('地图充电站数据:', stationsRes.data); // 处理数据... }); 系统功能说明 数据概览仪表板:
显示车辆总数、品牌分布、状态分布
充电站统计信息
今日充电量和行驶里程
车辆管理:
车辆列表分页查询
车辆状态实时更新
充电管理:
充电站信息查询
充电记录统计
充电站利用率分析
行驶数据分析:
行驶里程统计
能耗效率分析
品牌能耗对比
地图可视化:
车辆实时位置显示
充电站位置及状态显示
部署说明 安装依赖:
pip install flask flask-cors sqlite3 初始化数据库:
python init_db.py 启动服务:
python app.py 访问API:
仪表板数据: http://localhost:5000/api/dashboard/summary
车辆列表: http://localhost:5000/api/vehicles/list
充电站地图数据: http://localhost:5000/api/map/stations
这个系统提供了新能源汽车监控和分析所需的基本API接口,前端可以通过这些接口获取数据并实现可视化大屏展示。