SQLite是安卓数据持久化的重要手段. 为了便捷的操作sql, 衍生了很多ORM (Object/Relation Mapping 对象关系映射). Android端比较流行的有 GreenDao, Realm, LitePal等等.
Google在 AAC 中也推出了自己的 ORM 框架 Room. 除了基础的 runtime 包, apt注解包. 还提供了对 AAC LiveData 和老牌 RxJava 两种观察者模式框架的扩展.
dependencies {
def room_version = "2.2.0-rc01"
implementation "androidx.room:room-runtime:$room_version" //Room
//编译期注解, kotlin使用kapt代替annotationProcessor
annotationProcessor "androidx.room:room-compiler:$room_version"
kapt "androidx.room:room-compiler:$rroom_version"
// ktx扩展和协程
implementation "androidx.room:room-ktx:$room_version"
// Room的RxJava扩展
implementation "androidx.room:room-rxjava2:$room_version"
// Room的的Google guava扩展
implementation "androidx.room:room-guava:$room_version"
// Test helpers
testImplementation "androidx.room:room-testing:$room_version"
}
和别的 ORM 框架一样, Room 使用了大量编译时注解帮助我们生成模板代码 (Java/annotationProcesser 和 Kotlin/kapt). 包括主要的类注解 :
- DataBase : 数据库实例
- Dao : 数据访问对象, 维护了数据库增删改查的方法
- Entity : 映射表结构的对象
使用方法和别的 ORM 差异不大, 具体参考官方文档. 这里介绍的是 Room 的数据跨进程共享的实现.
ContentProvider
提到数据库跨进程, 首先想到的肯定是ContentProvider, 他用了两套C/S模型:
- 多进程操作数据库
多个进程(客户端)连接ContentProvider(服务端), ContentProvider 提供Binder实现给客户端调用. - 跨进程数据共享
当一个进程操作ContentProvider变更数据之后,可能希望其他进程能收到通知.
客户端通过getContentResolver().registerContentObserver()
注册ContentObserver, 他提供 Binder 并把 Binder 交给系统级别的服务 ContentService, 更新就能通过服务端分发到各个客户端.
Room
Room 也支持跨进程数据共享, 只需在构造对象的时候添加 enableMultiInstanceInvalidation()
选项
@Synchronized
fun getDataBase(): AppDataBase {
return Room.databaseBuilder(
App.instance(),
AppDataBase::class.java, "databasenameXXXX"
)
.enableMultiInstanceInvalidation()
.build()
}
他的实现原理是什么呢? 看官网的一段注释 :
如果你的APP运行在单进程中, 你应该设置 Room 为单例, 因为获取 RoomDatabase 实例的成本较高, 且很少需要在一个进程中创建多个实例.
如果你的APP是多进程的, 在构造 database 时配置enableMultiInstanceInvalidation()
选项, 那么不同进程就会根据同一个db文件创建属于各自进程的单独实例. 并且当一个进程中的实例失效(发生变化)时, 会自动将失效传播到别的进程中.
初始化
在 db.init()
初始化时, 将多进程的处理交给 InvalidationTracker 对象.
public abstract class RoomDatabase {
public static class Builder<T extends RoomDatabase> {
private boolean mMultiInstanceInvalidation;
@NonNull
public Builder<T> enableMultiInstanceInvalidation() {
mMultiInstanceInvalidation = mName != null;
return this;
}
public T build() {
....
//DatabaseConfiguration保存配置项
DatabaseConfiguration configuration =
new DatabaseConfiguration(....mMultiInstanceInvalidation....);
T db = Room.getGeneratedImplementation(mDatabaseClass, DB_IMPL_SUFFIX);
db.init(configuration); //初始化db
return db;
}
}
@CallSuper
public void init(@NonNull DatabaseConfiguration configuration) {
mOpenHelper = createOpenHelper(configuration);
....
//如果配置了多进程选项
if (configuration.multiInstanceInvalidation) {
//使用 mInvalidationTracker 来处理多进程的情况
mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,
configuration.name);
}
}
}
InvalidationTracker 在apt帮我们生成的 RoomDatabase 实现 AppDataBase_Impl
中创建. 他和 RoomDatabase 对象是一对一的关系.
public final class AppDataBase_Impl extends AppDataBase {
@Override
protected InvalidationTracker createInvalidationTracker() {
final HashMap<String, String> _shadowTablesMap = new HashMap<String, String>(0);
HashMap<String, Set<String>> _viewTables = new HashMap<String, Set<String>>(0);
return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "nodemodel","user_profile");
}
}
mInvalidationTracker.startMultiInstanceInvalidation
中创建了 MultiInstanceInvalidationClient 对象.
public class InvalidationTracker {
private MultiInstanceInvalidationClient mMultiInstanceInvalidationClient;
//name是数据库的名字, executor是一个异步的线程池
void startMultiInstanceInvalidation(Context context, String name) {
mMultiInstanceInvalidationClient = new MultiInstanceInvalidationClient(context, name, this,
mDatabase.getQueryExecutor());
}
}
在 MultiInstanceInvalidationClient
的构造方法中使用了 bindService()
连接MultiInstanceInvalidationService
, 这里使用了绑定服务来进行一个 Service 和多个进程的 Client 的IPC.
/**
* Handles all the communication from {@link RoomDatabase} and {@link InvalidationTracker} to
* {@link MultiInstanceInvalidationService}.
*/
class MultiInstanceInvalidationClient {
MultiInstanceInvalidationClient(Context context, String name,
InvalidationTracker invalidationTracker, Executor executor) {
mContext = context.getApplicationContext();
mName = name;
mInvalidationTracker = invalidationTracker;
mExecutor = executor;
mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
@Override
public void onInvalidated(@NonNull Set<String> tables) {
if (mStopped.get()) {
return;
}
try {
mService.broadcastInvalidation(mClientId,
tables.toArray(new String[0]));
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
}
}
@Override
boolean isRemote() {
return true;
}
};
Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
}
}
绑定服务介绍 :
创建提供绑定的服务时,您必须提供 IBinder
,进而提供编程接口,以便客户端使用此接口与服务进行交互. 您可以通过三种方法定义接口 :
Room 中使用的是 AIDL, 使用必须执行以下步骤:
- 创建 aidl 接口文件, 在文件内声明方法
- 编译时会根据 adil 生成 XXInterface.java 接口文件 , 内部类 XXInterface.Stub 实现了 IBinder 接口并声明了 aidl 内定义的方法
- Service 的
public IBinder onBind(Intent intent)
方法 , 返回一个 XXInterface.Stub 的实现类实例, 其定义了服务的远程过程调用 (RPC) 接口, 给客户端调用. - Client 调用
bindService()
连接 Service, 在连接成功的onServiceConnected(ComponentName name, IBinder service)
中将 Service 的 IBinder 转换为 XXInterface 对象. 通过这个对象就可以调用 Service 中定义的方法实现多个 Client 和 Service 的跨进程通信.
MultiInstanceInvalidationClient
对于同一个db文件, 每个进程可以创建自己的 RoomDatabase
对象. 在 RoomDataBase
中创建了 InvalidationTracker
, 在 InvalidationTracker
创建了 MultiInstanceInvalidationClient
.
Client 的构造方法中通过 bindService()
绑定服务,Service#Binder 向 Client 提供跨进程调用的三个方法 :
- 绑定服务成功后调用 Service#Binder 的
service.registerCallback()
方法. - 在构造中的 mObserver 里调用
mService.broadcastInvalidation()
. - 在客户端的 stop() 方法中调用
service.unregisterCallback(mCallback, mClientId)
class MultiInstanceInvalidationClient {
@Nullable
IMultiInstanceInvalidationService mService; //Service中的Binder对象
MultiInstanceInvalidationClient(Context context, String name,
InvalidationTracker invalidationTracker, Executor executor) {
mContext = context.getApplicationContext();
mName = name;
mInvalidationTracker = invalidationTracker;
mExecutor = executor;
mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
@Override
public void onInvalidated(@NonNull Set<String> tables) {
if (mStopped.get()) {
return;
}
try {
//Client 跨进程调用 Service 的传播更新方法
mService.broadcastInvalidation(mClientId,
tables.toArray(new String[0]));
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
}
}
@Override
boolean isRemote() {
return true;
}
};
//绑定服务
Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
}
final ServiceConnection mServiceConnection = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder service) {
mService = IMultiInstanceInvalidationService.Stub.asInterface(service);
//连接服务端成功
mExecutor.execute(mSetUpRunnable);
}
@Override
public void onServiceDisconnected(ComponentName name) {
mExecutor.execute(mRemoveObserverRunnable);
mService = null;
mContext = null;
}
};
final Runnable mSetUpRunnable = new Runnable() {
@Override
public void run() {
try {
final IMultiInstanceInvalidationService service = mService;
if (service != null) {
//调用Service#Binder的 registerCallback() 方法
mClientId = service.registerCallback(mCallback, mName);
//mInvalidationTracker添加Observer
mInvalidationTracker.addObserver(mObserver);
}
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot register multi-instance invalidation callback", e);
}
}
};
final Runnable mRemoveObserverRunnable = new Runnable() {
@Override
public void run() {
//mInvalidationTracker移除Observer
mInvalidationTracker.removeObserver(mObserver);
}
};
}
MultiInstanceInvalidationService
MultiInstanceInvalidationService
作为服务端可以向多个进程中的客户端MultiInstanceInvalidationClient
提供绑定服务.
IMultiInstanceInvalidationService.aidl 作为 AIDL 接口, 定义了三个接口方法给 Service 实现 :
interface IMultiInstanceInvalidationService {
//注册回调
int registerCallback(IMultiInstanceInvalidationCallback callback, String name);
//反注册回调
void unregisterCallback(IMultiInstanceInvalidationCallback callback, int clientId);
//传播失效, 即进程间同步
oneway void broadcastInvalidation(int clientId, in String[] tables);
}
1. registerCallback() 注册
registerCallback()
涉及到两个重要的变量 mCallbackList
和 mClientNames
.
-
mCallbackList
是个列表, 用于保存服务端对多个远程客户端的callBack. 整个通知更新的过程分为两步:
Client 通过 Service#Binder 和 Service 建立连接并调用 Service#Binder 的broadcastInvalidation()
方法让 Service 来传播更新.
Service 通知其他 Client 更新的过程也涉及到 IPC, 这次由每个 Client 提供 Client#Binder 供 Serivce 调用,mCallbackList
就是用于保存每个客户端的 Binder.
public class MultiInstanceInvalidationService extends Service {
final RemoteCallbackList<IMultiInstanceInvalidationCallback> mCallbackList =
new RemoteCallbackList<IMultiInstanceInvalidationCallback>() {
@Override
public void onCallbackDied(IMultiInstanceInvalidationCallback callback,
Object cookie) {
//如果Client的Binder挂掉了, 就从mClientNames移除
mClientNames.remove((int) cookie);
}
};
// Service 提供给Client的 Binder : IMultiInstanceInvalidationService.Stub
private final IMultiInstanceInvalidationService.Stub mBinder =
new IMultiInstanceInvalidationService.Stub() {
// Assigns a client ID to the client.
@Override
public int registerCallback(IMultiInstanceInvalidationCallback callback,
String name) {
if (name == null) {
return 0;
}
synchronized (mCallbackList) {
int clientId = ++mMaxClientId;
// Use the client ID as the RemoteCallbackList cookie.
if (mCallbackList.register(callback, clientId)) {
mClientNames.append(clientId, name);
return clientId;
} else {
--mMaxClientId;
return 0;
}
}
}
}
// 客户端提供Binder: IMultiInstanceInvalidationCallback.Stub 给服务端调用
class MultiInstanceInvalidationClient {
final IMultiInstanceInvalidationCallback mCallback =
new IMultiInstanceInvalidationCallback.Stub() {
@Override
public void onInvalidation(final String[] tables) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
mInvalidationTracker.notifyObserversByTableNames(tables);
}
});
}
};
}
-
mClientNames
使用 SparseArrayCompat<String> 保存所有连接他的客户端名称, 调用mClientNames.append(clientId, name)
把客户端加入集合.
key : clientId 由Service分配, 每次+1.
value : clientName 就是 RoomDatabase 数据库的名字.
2. unregisterCallback() 反注册
从 mCallbackList
, mClientNames
集合移除对应的客户端.
@Override
public void unregisterCallback(IMultiInstanceInvalidationCallback callback,
int clientId) {
synchronized (mCallbackList) {
mCallbackList.unregister(callback);
mClientNames.remove(clientId);
}
}
3. broadcastInvalidation 传播更新(将一个客户端的变化传播到别的客户端)
callback
作为 Client#Binder, 当 Client 调用过 Service#Binder 的 registerCallback()
后, 就将 callBack
添加到了 Service 的 mCallbackList
中.
当 Client 调用 Service#Binder 的 broadcastInvalidation()
传播更新时, 遍历 mCallbackList
调用 Client#Binder 的 callback.onInvalidation()
就将更新的逻辑通知到了各个 Client 的 onInvalidation()
实现.
// Broadcasts table invalidation to other instances of the same database file.
// The broadcast is not sent to the caller itself.
@Override
public void broadcastInvalidation(int clientId, String[] tables) {
//服务端和客户端是1/N的关系, 存在竞态, 方法加锁
synchronized (mCallbackList) {
String name = mClientNames.get(clientId);
if (name == null) {
Log.w(Room.LOG_TAG, "Remote invalidation client ID not registered");
return;
}
int count = mCallbackList.beginBroadcast();
try {
//遍历mCallbackList
for (int i = 0; i < count; i++) {
int targetClientId = (int) mCallbackList.getBroadcastCookie(i);
String targetName = mClientNames.get(targetClientId);
//调用这个方法的客户端不需要参与同步, name不同即不是一个数据库文件的客户端不需要同步
if (clientId == targetClientId // This is the caller itself.
|| !name.equals(targetName)) { // Not the same file.
continue;
}
try {
//这里涉及到服务端调用客户端的过程, 也是通过aidl
IMultiInstanceInvalidationCallback callback =
mCallbackList.getBroadcastItem(i);
callback.onInvalidation(tables);
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Error invoking a remote callback", e);
}
}
} finally {
mCallbackList.finishBroadcast();
}
}
}
遍历时过滤掉不需要同步的客户端
-
clientId == targetClientId
发起传播的客户端肯定不需要通知自己 -
!name.equals(targetName)
一个APP可以有多个本地db文件, 我们需要同步的是同一个db文件在不同进程中的 RoomDatabase 对象对应的MultiInstanceInvalidationClient
(他的name就是数据库的名字).
如果 name 不同, 意味着不是同一个db文件的Client, 不需要参与同步.
传播更新
传播更新由 MultiInstanceInvalidationService
调用 MultiInstanceInvalidationClient
中的 Binder 方法来实现, 同样是通过AIDL.
IMultiInstanceInvalidationCallback.aidl 只定义了一个方法 :
interface IMultiInstanceInvalidationCallback {
oneway void onInvalidation(in String[] tables);
}
MultiInstanceInvalidationClient
中的实现 :
class MultiInstanceInvalidationClient {
final IMultiInstanceInvalidationCallback mCallback =
new IMultiInstanceInvalidationCallback.Stub() {
@Override
public void onInvalidation(final String[] tables) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
//将数据库中表的更新通知给观察者
mInvalidationTracker.notifyObserversByTableNames(tables);
}
});
}
};
}
所以 MultiInstanceInvalidationService # broadcastInvalidation()
→ MultiInstanceInvalidationClient # callback.onInvalidation()
→ mInvalidationTracker.notifyObserversByTableNames(tables)
, 最终更新会交给每个Client 对应的 InvalidationTracker 处理.
接收更新的MultiInstanceInvalidationClient
InvalidationTracker 中掉用 notifyObserversByTableNames(String... tables)
通知观察者的集合 mObserverMap
.
public class InvalidationTracker {
public void notifyObserversByTableNames(String... tables) {
synchronized (mObserverMap) {
//mObserverMap 中保存了当前进程的数据库观察者
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
//!entry.getKey().isRemote() 排除了Client构造方法内的Observer
if (!entry.getKey().isRemote()) {
//这里的tables就是当前进程修改过, 需要同步到别的进程的表
entry.getValue().notifyByTableNames(tables);
}
}
}
}
static class ObserverWrapper {
void notifyByTableNames(String[] tables) {
Set<String> invalidatedTables = null;
if (mTableNames.length == 1) {
for (String table : tables) {
if (table.equalsIgnoreCase(mTableNames[0])) {
// Optimization for a single-table observer
invalidatedTables = mSingleTableSet;
break;
}
}
} else {
ArraySet<String> set = new ArraySet<>();
for (String table : tables) { //遍历传播更新的Client修改过的表
for (String ourTable : mTableNames) { //遍历当前Client的所有表
if (ourTable.equalsIgnoreCase(table)) { //只把需要更新的表加入set集合
set.add(ourTable);
break;
}
}
}
if (set.size() > 0) {
invalidatedTables = set;
}
}
if (invalidatedTables != null) { //通知观察者更新表
mObserver.onInvalidated(invalidatedTables);
}
}
}
}
Q: 那么mObserverMap
中的 InvalidationTracker.Observer 在哪里被添加呢?**
A: InvalidationTracker.Observer 就是观察者, 他是一个抽象类, 在 LiveData 和 RxJava 两种观察者模式框架中都提供了他的实现.
我们以 RxJava 为例, 举个例子 :
@Dao
interface NodeModelDao {
@Query("SELECT * from nodemodel")
fun getAll(): Flowable<List<NodeModel>>
}
查看 kapt 编译后生成的 NodeModelDaoImpl.java 文件, 可以看到 Room 帮助我们在Callable
的 call()
方法中生成了查询的Sql操作.
public final class NodeModelDao_Impl implements NodeModelDao {
@Override
public Flowable<List<NodeModel>> getAll() {
final String _sql = "SELECT * from nodemodel";
final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 0);
return RxRoom.createFlowable(__db, new String[]{"nodemodel"}, new Callable<List<NodeModel>>() {
@Override
public List<NodeModel> call() throws Exception {
final Cursor _cursor = DBUtil.query(__db, _statement, false);
try {
.....
_item = new NodeModel(_tmpUid,_tmpId,_tmpName,_tmpTitle,_tmpTitleAlternative,_tmpUrl,_tmpTopics,_tmpHeader,_tmpFooter,_tmpIsCollected);
_result.add(_item);
return _result;
} finally {
_cursor.close();
}
}
}
}
查看 RxRoom.createFlowable
方法, 他的内部会创建一个 InvalidationTracker.Observer
实例 observer, 接着调用 database.getInvalidationTracker().addObserver(observer)
将 observer 添加到 InvalidationTracker 的 mObserverMap
中. observer 的 onInvalidated()
方法会调用 emitter.onNext(NOTHING)
再次发射, 发射后会调用 flatMapMaybe()
转换流为 maybe
返回给观察者, 而 maybe = Maybe.fromCallable(callable)
, Callable 内的 call()
实现就是上面提到的Sql查询操作.
public class RxRoom {
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public static <T> Flowable<T> createFlowable(final RoomDatabase database,
final String[] tableNames, final Callable<T> callable) {
Scheduler scheduler = Schedulers.from(database.getQueryExecutor());
final Maybe<T> maybe = Maybe.fromCallable(callable);
return createFlowable(database, tableNames)
.observeOn(scheduler)
.flatMapMaybe(new Function<Object, MaybeSource<T>>() {
@Override
public MaybeSource<T> apply(Object o) throws Exception {
return maybe;
}
});
}
public static Flowable<Object> createFlowable(final RoomDatabase database,
final String... tableNames) {
return Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
//创建InvalidationTracker.Observer实例
final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
tableNames) {
@Override
public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
if (!emitter.isCancelled()) {
//再次发射
emitter.onNext(NOTHING);
}
}
};
if (!emitter.isCancelled()) {
database.getInvalidationTracker().addObserver(observer);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
database.getInvalidationTracker().removeObserver(observer);
}
}));
}
// emit once to avoid missing any data and also easy chaining
if (!emitter.isCancelled()) {
emitter.onNext(NOTHING);
}
}
}, BackpressureStrategy.LATEST);
}
}
当别的进程的 Client
修改了数据, Service
会将更新传播到所有进程相同名字的 Client
. 每个进程的 Client
对应的 InvalidationTracker 会将需要更新的表 tables 交给内部维护的集合 mObserverMap
处理.
InvalidationTracker.Observer 是个抽象类, 我们使用 LiveData/RxJava 作为 Room 查询的观察者实现时, 会实现 InvalidationTracker.Observer 对象并把它添加到 InvalidationTracker 的 mObserverMap
集合中. 遍历集合并调用 InvalidationTracker.Observer 的 onInvalidated()
实现, 就可以让上游再次发送消息, 再次触发下游观察者的订阅, 到这里更新的流程就结束了.
发起更新的MultiInstanceInvalidationClient
我们已经完成了 Service 通过 Client#Binder 向各个 Client 传播更新的IPC流程, 以及 每个 Client 的 InvalidationTracker 又通过 mObserverMap
通知自己的观察者的流程. 接下来我们只要知道 Service#broadcastInvalidation()
在什么情况下会被调用.
查看Client的构造方法, 这里初始化的 InvalidationTracker.Observer 的 onInvalidated()
中调用了 mService.broadcastInvalidation()
. 所以只需要知道这个 observer 的方法什么时候被调用.
MultiInstanceInvalidationClient(Context context, String name,
InvalidationTracker invalidationTracker, Executor executor) {
mContext = context.getApplicationContext();
mName = name;
mInvalidationTracker = invalidationTracker;
mExecutor = executor;
mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
@Override
public void onInvalidated(@NonNull Set<String> tables) {
if (mStopped.get()) {
return;
}
try {
mService.broadcastInvalidation(mClientId,
tables.toArray(new String[0]));
} catch (RemoteException e) {
Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
}
}
@Override
boolean isRemote() {
return true;
}
};
Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
}
查看apt编译后生成的XXDao_Impl, 可以看到包括插入,删除,修改等涉及到写的操作都是通过事务 transcation 来完成的. 这很好理解 写入操作要保证原子性.
public final class NodeModelDao_Impl implements NodeModelDao {
@Override
public void insertAll(List<NodeModel> userEntities) {
__db.beginTransaction();
try {
__insertionAdapterOfNodeModel.insert(userEntities);
__db.setTransactionSuccessful();
} finally {
__db.endTransaction();
}
}
@Override
public void deleteAll() {
final SupportSQLiteStatement _stmt = __preparedStmtOfDeleteAll.acquire();
__db.beginTransaction();
try {
_stmt.executeUpdateDelete();
__db.setTransactionSuccessful();
} finally {
__db.endTransaction();
__preparedStmtOfDeleteAll.release(_stmt);
}
}
}
RoomDatabase # endTransaction() 在事务结束的方法中, 调用了 mInvalidationTracker.refreshVersionsAsync()
public void endTransaction() {
mOpenHelper.getWritableDatabase().endTransaction();
if (!inTransaction()) {
// enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
// endTransaction call to do it.
mInvalidationTracker.refreshVersionsAsync();
}
}
InvalidationTracker # refreshVersionsAsync() , 遍历 mObserverMap
, map里包含两种Observer
- Client 构造方法内用于IPC调用
mService.broadcastInvalidation()
的Observer - Client 内做查询操作的观察者的Observer
public void refreshVersionsAsync() {
// TODO we should consider doing this sync instead of async.
if (mPendingRefresh.compareAndSet(false, true)) {
mDatabase.getQueryExecutor().execute(mRefreshRunnable);
}
}
@VisibleForTesting
Runnable mRefreshRunnable = new Runnable() {
@Override
public void run() {
final Lock closeLock = mDatabase.getCloseLock();
boolean hasUpdatedTable = false;
if (hasUpdatedTable) {
synchronized (mObserverMap) {
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
entry.getValue().notifyByTableVersions(mTableInvalidStatus);
}
}
}
}
};
InvalidationTracker.ObserverWrapper # notifyByTableVersions()
void notifyByTableVersions(BitSet tableInvalidStatus) {
Set<String> invalidatedTables = null;
final int size = mTableIds.length;
for (int index = 0; index < size; index++) {
final int tableId = mTableIds[index];
if (tableInvalidStatus.get(tableId)) {
if (size == 1) {
// Optimization for a single-table observer
invalidatedTables = mSingleTableSet;
} else {
if (invalidatedTables == null) {
invalidatedTables = new ArraySet<>(size);
}
invalidatedTables.add(mTableNames[index]);
}
}
}
if (invalidatedTables != null) {
mObserver.onInvalidated(invalidatedTables);
}
}
验证
通过 android:process 我们可以最快的模拟跨进程的情况
<activity android:name=".ui.DetailActivity1"/>
<activity android:name=".ui.DetailActivity2"
android:process=":Process2" />
如图, 当在Process2中对database进行 insert/delete 写入操作时, Process1中的观察者也能响应跨进程的更新, 从而更新UI.
总结
Room 跨进程共享数据使用了两套 C/S 模型, 涉及到两个aidl类. Service 提供 ServiceBinder
, 每个 Client 也会提供 ClientBinder
. 大概流程 :
所有 Client 初始化会调用
ServiceBinder
的registerCallback()
方法将ClientBinder
传递给 Service, Service 内维护了mCallbackList
来保存所有的ClientBinder
.
当某个 Client 更新时, 会调用ServiceBinder
的broadcastInvalidation()
传播更新.Service 的
broadcastInvalidation()
会遍历ClientBinder
的集合mCallbackList
, 将更新交给每个 Client 对应的 InvalidationTracker 处理.InvalidationTracker 维护了对database数据的观察者集合
mObserverMap
, 遍历map调用observer#onInvalidated()
会触发再次查询的操作, 观察者就可以接收到新的数据.