import time
import threading
from datetime import datetime
class SnowflakeGenerator:
def __init__(self, datacenter_id, worker_id, epoch=datetime(2020, 1, 1)):
if not (0 <= datacenter_id <= 31):
raise ValueError("Datacenter ID must be between 0 and 31")
if not (0 <= worker_id <= 31):
raise ValueError("Worker ID must be between 0 and 31")
self.datacenter_id = datacenter_id
self.worker_id = worker_id
self.epoch = int(epoch.timestamp() * 1000) # 转换为毫秒时间戳
self.last_timestamp = -1
self.sequence = 0
self.lock = threading.Lock()
def _current_time(self):
"""获取当前时间与epoch的时间差(毫秒)"""
return int(time.time() * 1000) - self.epoch
def _wait_next_millis(self, last_timestamp):
"""等待直到下一毫秒"""
current_time = self._current_time()
while current_time <= last_timestamp:
time.sleep(0.001)
current_time = self._current_time()
return current_time
def generate_id(self):
with self.lock:
current_time = self._current_time()
if current_time < self.last_timestamp:
raise ValueError(f"Clock moved backwards. Refusing to generate ID for {self.last_timestamp - current_time} milliseconds")
if current_time == self.last_timestamp:
self.sequence = (self.sequence + 1) & 0xFFF # 12位序列号
if self.sequence == 0: # 当前毫秒序列号用完
current_time = self._wait_next_millis(current_time)
else:
self.sequence = 0
self.last_timestamp = current_time
# 组合各部分生成ID
return (current_time << 22) | \
(self.datacenter_id << 17) | \
(self.worker_id << 12) | \
self.sequence
# 使用示例
if __name__ == "__main__":
# 初始化生成器(数据中心ID和工作者ID可根据实际部署配置)
snowflake = SnowflakeGenerator(datacenter_id=1, worker_id=1)
# 生成10个ID并打印
for _ in range(10):
print(snowflake.generate_id())
关键特性说明:
- 唯一性保证:
- 41位时间戳(约69年)
- 5位数据中心ID(0-31)
- 5位工作者ID(0-31)
- 12位序列号(每毫秒4096个ID)
- 时钟回拨处理:
- 检测到系统时间回退时直接抛出异常
- 序列号耗尽时自动等待至下一毫秒
- 线程安全:
- 使用互斥锁确保多线程环境下的正确性
- 可定制性:
- 可自定义epoch起始时间
- 独立配置数据中心和工作节点
使用建议:
- 分布式环境中应为每个节点分配独立的
datacenter_id
和worker_id
- 部署NTP服务保持系统时钟同步
- 捕获时钟回拨异常并做适当处理(如重试机制)
该实现严格遵循Twitter雪花算法规范,能够满足大多数分布式系统的唯一ID生成需求。