0
点赞
收藏
分享

微信扫一扫

AAC-Room 跨进程分析

江南北 2021-09-27 阅读 52
日记本

SQLite是安卓数据持久化的重要手段. 为了便捷的操作sql, 衍生了很多ORM (Object/Relation Mapping 对象关系映射). Android端比较流行的有 GreenDao, Realm, LitePal等等.

Google在 AAC 中也推出了自己的 ORM 框架 Room. 除了基础的 runtime 包, apt注解包. 还提供了对 AAC LiveData 和老牌 RxJava 两种观察者模式框架的扩展.

dependencies {
  def room_version = "2.2.0-rc01"

  implementation "androidx.room:room-runtime:$room_version"  //Room

  //编译期注解, kotlin使用kapt代替annotationProcessor
  annotationProcessor "androidx.room:room-compiler:$room_version" 
  kapt "androidx.room:room-compiler:$rroom_version"  

  // ktx扩展和协程
  implementation "androidx.room:room-ktx:$room_version"

  // Room的RxJava扩展
  implementation "androidx.room:room-rxjava2:$room_version"

  // Room的的Google guava扩展
  implementation "androidx.room:room-guava:$room_version"

  // Test helpers
  testImplementation "androidx.room:room-testing:$room_version"
}

和别的 ORM 框架一样, Room 使用了大量编译时注解帮助我们生成模板代码 (Java/annotationProcesser 和 Kotlin/kapt). 包括主要的类注解 :

  • DataBase : 数据库实例
  • Dao : 数据访问对象, 维护了数据库增删改查的方法
  • Entity : 映射表结构的对象

使用方法和别的 ORM 差异不大, 具体参考官方文档. 这里介绍的是 Room 的数据跨进程共享的实现.

ContentProvider

提到数据库跨进程, 首先想到的肯定是ContentProvider, 他用了两套C/S模型:

  1. 多进程操作数据库
    多个进程(客户端)连接ContentProvider(服务端), ContentProvider 提供Binder实现给客户端调用.
  2. 跨进程数据共享
    当一个进程操作ContentProvider变更数据之后,可能希望其他进程能收到通知.
    客户端通过 getContentResolver().registerContentObserver() 注册ContentObserver, 他提供 Binder 并把 Binder 交给系统级别的服务 ContentService, 更新就能通过服务端分发到各个客户端.
Room

Room 也支持跨进程数据共享, 只需在构造对象的时候添加 enableMultiInstanceInvalidation() 选项

@Synchronized
fun getDataBase(): AppDataBase {
    return Room.databaseBuilder(
        App.instance(),
        AppDataBase::class.java, "databasenameXXXX"
    )
        .enableMultiInstanceInvalidation()
        .build()
}

他的实现原理是什么呢? 看官网的一段注释 :

如果你的APP运行在单进程中, 你应该设置 Room 为单例, 因为获取 RoomDatabase 实例的成本较高, 且很少需要在一个进程中创建多个实例.

如果你的APP是多进程的, 在构造 database 时配置enableMultiInstanceInvalidation()选项, 那么不同进程就会根据同一个db文件创建属于各自进程的单独实例. 并且当一个进程中的实例失效(发生变化)时, 会自动将失效传播到别的进程中.

初始化

db.init() 初始化时, 将多进程的处理交给 InvalidationTracker 对象.

public abstract class RoomDatabase {

    public static class Builder<T extends RoomDatabase> {

        private boolean mMultiInstanceInvalidation;
        @NonNull
        public Builder<T> enableMultiInstanceInvalidation() {
            mMultiInstanceInvalidation = mName != null;
            return this;
        }

        public T build() {
            ....
            //DatabaseConfiguration保存配置项
            DatabaseConfiguration configuration =
                    new DatabaseConfiguration(....mMultiInstanceInvalidation....);
            T db = Room.getGeneratedImplementation(mDatabaseClass, DB_IMPL_SUFFIX);
            db.init(configuration);  //初始化db
            return db;
        }
    }

    @CallSuper
    public void init(@NonNull DatabaseConfiguration configuration) {
        mOpenHelper = createOpenHelper(configuration);
        ....
        //如果配置了多进程选项
        if (configuration.multiInstanceInvalidation) {
            //使用 mInvalidationTracker 来处理多进程的情况
            mInvalidationTracker.startMultiInstanceInvalidation(configuration.context,
                    configuration.name);
        }
    }
}

InvalidationTracker 在apt帮我们生成的 RoomDatabase 实现 AppDataBase_Impl 中创建. 他和 RoomDatabase 对象是一对一的关系.

public final class AppDataBase_Impl extends AppDataBase {
  @Override
  protected InvalidationTracker createInvalidationTracker() {
    final HashMap<String, String> _shadowTablesMap = new HashMap<String, String>(0);
    HashMap<String, Set<String>> _viewTables = new HashMap<String, Set<String>>(0);
    return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "nodemodel","user_profile");
  }
}

mInvalidationTracker.startMultiInstanceInvalidation 中创建了 MultiInstanceInvalidationClient 对象.


public class InvalidationTracker {
    private MultiInstanceInvalidationClient mMultiInstanceInvalidationClient;
    //name是数据库的名字, executor是一个异步的线程池
    void startMultiInstanceInvalidation(Context context, String name) {
        mMultiInstanceInvalidationClient = new MultiInstanceInvalidationClient(context, name, this,
                mDatabase.getQueryExecutor());
    }
}

MultiInstanceInvalidationClient 的构造方法中使用了 bindService() 连接MultiInstanceInvalidationService, 这里使用了绑定服务来进行一个 Service 和多个进程的 Client 的IPC.

/**
 * Handles all the communication from {@link RoomDatabase} and {@link InvalidationTracker} to
 * {@link MultiInstanceInvalidationService}.
 */
class MultiInstanceInvalidationClient {

    MultiInstanceInvalidationClient(Context context, String name,
            InvalidationTracker invalidationTracker, Executor executor) {
        mContext = context.getApplicationContext();
        mName = name;
        mInvalidationTracker = invalidationTracker;
        mExecutor = executor;
        mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
            @Override
            public void onInvalidated(@NonNull Set<String> tables) {
                if (mStopped.get()) {
                    return;
                }
                try {
                    mService.broadcastInvalidation(mClientId,
                            tables.toArray(new String[0]));
                } catch (RemoteException e) {
                    Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
                }
            }

            @Override
            boolean isRemote() {
                return true;
            }
        };
        Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
        mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
    }
}

绑定服务介绍 :
创建提供绑定的服务时,您必须提供 IBinder,进而提供编程接口,以便客户端使用此接口与服务进行交互. 您可以通过三种方法定义接口 :

Room 中使用的是 AIDL, 使用必须执行以下步骤:

  1. 创建 aidl 接口文件, 在文件内声明方法
  2. 编译时会根据 adil 生成 XXInterface.java 接口文件 , 内部类 XXInterface.Stub 实现了 IBinder 接口并声明了 aidl 内定义的方法
  3. Service 的 public IBinder onBind(Intent intent) 方法 , 返回一个 XXInterface.Stub 的实现类实例, 其定义了服务的远程过程调用 (RPC) 接口, 给客户端调用.
  4. Client 调用 bindService() 连接 Service, 在连接成功的 onServiceConnected(ComponentName name, IBinder service) 中将 Service 的 IBinder 转换为 XXInterface 对象. 通过这个对象就可以调用 Service 中定义的方法实现多个 Client 和 Service 的跨进程通信.

MultiInstanceInvalidationClient

对于同一个db文件, 每个进程可以创建自己的 RoomDatabase 对象. 在 RoomDataBase 中创建了 InvalidationTracker, 在 InvalidationTracker 创建了 MultiInstanceInvalidationClient.

Client 的构造方法中通过 bindService() 绑定服务,Service#Binder 向 Client 提供跨进程调用的三个方法 :

  • 绑定服务成功后调用 Service#Binder 的 service.registerCallback() 方法.
  • 在构造中的 mObserver 里调用 mService.broadcastInvalidation().
  • 在客户端的 stop() 方法中调用 service.unregisterCallback(mCallback, mClientId)
class MultiInstanceInvalidationClient {
    @Nullable
    IMultiInstanceInvalidationService mService;  //Service中的Binder对象

    MultiInstanceInvalidationClient(Context context, String name,
            InvalidationTracker invalidationTracker, Executor executor) {
        mContext = context.getApplicationContext();
        mName = name;
        mInvalidationTracker = invalidationTracker;
        mExecutor = executor;
        mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
            @Override
            public void onInvalidated(@NonNull Set<String> tables) {
                if (mStopped.get()) {
                    return;
                }
                try {
                    //Client 跨进程调用 Service 的传播更新方法
                    mService.broadcastInvalidation(mClientId,
                            tables.toArray(new String[0]));
                } catch (RemoteException e) {
                    Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
                }
            }

            @Override
            boolean isRemote() {
                return true;
            }
        };
        //绑定服务
        Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
        mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
    }

    final ServiceConnection mServiceConnection = new ServiceConnection() {

        @Override
        public void onServiceConnected(ComponentName name, IBinder service) {
            mService = IMultiInstanceInvalidationService.Stub.asInterface(service);
            //连接服务端成功
            mExecutor.execute(mSetUpRunnable);
        }

        @Override
        public void onServiceDisconnected(ComponentName name) {
            mExecutor.execute(mRemoveObserverRunnable);
            mService = null;
            mContext = null;
        }
    };

    final Runnable mSetUpRunnable = new Runnable() {
        @Override
        public void run() {
            try {
                final IMultiInstanceInvalidationService service = mService;
                if (service != null) {
                    //调用Service#Binder的 registerCallback() 方法
                    mClientId = service.registerCallback(mCallback, mName);
                    //mInvalidationTracker添加Observer
                    mInvalidationTracker.addObserver(mObserver);
                }
            } catch (RemoteException e) {  
                Log.w(Room.LOG_TAG, "Cannot register multi-instance invalidation callback", e);
            }
        }
    };

    final Runnable mRemoveObserverRunnable = new Runnable() {
        @Override
        public void run() {
            //mInvalidationTracker移除Observer
            mInvalidationTracker.removeObserver(mObserver);
        }
    };
}

MultiInstanceInvalidationService

MultiInstanceInvalidationService 作为服务端可以向多个进程中的客户端MultiInstanceInvalidationClient 提供绑定服务.
IMultiInstanceInvalidationService.aidl 作为 AIDL 接口, 定义了三个接口方法给 Service 实现 :

interface IMultiInstanceInvalidationService {
    //注册回调
    int registerCallback(IMultiInstanceInvalidationCallback callback, String name);
    //反注册回调
    void unregisterCallback(IMultiInstanceInvalidationCallback callback, int clientId);
    //传播失效, 即进程间同步
    oneway void broadcastInvalidation(int clientId, in String[] tables);
}

1. registerCallback() 注册

registerCallback() 涉及到两个重要的变量 mCallbackListmClientNames.

  • mCallbackList 是个列表, 用于保存服务端对多个远程客户端的callBack. 整个通知更新的过程分为两步:
    Client 通过 Service#Binder 和 Service 建立连接并调用 Service#BinderbroadcastInvalidation()方法让 Service 来传播更新.
    Service 通知其他 Client 更新的过程也涉及到 IPC, 这次由每个 Client 提供 Client#Binder 供 Serivce 调用, mCallbackList 就是用于保存每个客户端的 Binder.
public class MultiInstanceInvalidationService extends Service {
    final RemoteCallbackList<IMultiInstanceInvalidationCallback> mCallbackList =
            new RemoteCallbackList<IMultiInstanceInvalidationCallback>() {
                @Override
                public void onCallbackDied(IMultiInstanceInvalidationCallback callback,
                        Object cookie) {
                    //如果Client的Binder挂掉了, 就从mClientNames移除
                    mClientNames.remove((int) cookie);
                }
            };
    
    // Service 提供给Client的 Binder : IMultiInstanceInvalidationService.Stub
    private final IMultiInstanceInvalidationService.Stub mBinder =
            new IMultiInstanceInvalidationService.Stub() {

                // Assigns a client ID to the client.
                @Override
                public int registerCallback(IMultiInstanceInvalidationCallback callback,
                        String name) {
                    if (name == null) {
                        return 0;
                    }
                    synchronized (mCallbackList) {
                        int clientId = ++mMaxClientId;
                        // Use the client ID as the RemoteCallbackList cookie.
                        if (mCallbackList.register(callback, clientId)) {
                            mClientNames.append(clientId, name);
                            return clientId;
                        } else {
                            --mMaxClientId;
                            return 0;
                        }
                    }
                }

}

// 客户端提供Binder: IMultiInstanceInvalidationCallback.Stub 给服务端调用
class MultiInstanceInvalidationClient {
    final IMultiInstanceInvalidationCallback mCallback =
            new IMultiInstanceInvalidationCallback.Stub() {
                @Override
                public void onInvalidation(final String[] tables) {
                    mExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            mInvalidationTracker.notifyObserversByTableNames(tables);
                        }
                    });
                }
            };
}

  • mClientNames 使用 SparseArrayCompat<String> 保存所有连接他的客户端名称, 调用 mClientNames.append(clientId, name) 把客户端加入集合.
    key : clientId 由Service分配, 每次+1.
    value : clientName 就是 RoomDatabase 数据库的名字.

2. unregisterCallback() 反注册

mCallbackList, mClientNames 集合移除对应的客户端.

@Override
public void unregisterCallback(IMultiInstanceInvalidationCallback callback,
        int clientId) {
    synchronized (mCallbackList) {
        mCallbackList.unregister(callback);
        mClientNames.remove(clientId);
    }
}

3. broadcastInvalidation 传播更新(将一个客户端的变化传播到别的客户端)

callback 作为 Client#Binder, 当 Client 调用过 Service#Binder 的 registerCallback() 后, 就将 callBack 添加到了 Service 的 mCallbackList 中.

当 Client 调用 Service#Binder 的 broadcastInvalidation() 传播更新时, 遍历 mCallbackList 调用 Client#Binder 的 callback.onInvalidation() 就将更新的逻辑通知到了各个 Client 的 onInvalidation() 实现.

// Broadcasts table invalidation to other instances of the same database file.
// The broadcast is not sent to the caller itself.
@Override
public void broadcastInvalidation(int clientId, String[] tables) {
    //服务端和客户端是1/N的关系, 存在竞态, 方法加锁
    synchronized (mCallbackList) {
        String name = mClientNames.get(clientId);
        if (name == null) {
            Log.w(Room.LOG_TAG, "Remote invalidation client ID not registered");
            return;
        }
        int count = mCallbackList.beginBroadcast();
        try {
            //遍历mCallbackList
            for (int i = 0; i < count; i++) {
                int targetClientId = (int) mCallbackList.getBroadcastCookie(i);
                String targetName = mClientNames.get(targetClientId);  
                //调用这个方法的客户端不需要参与同步, name不同即不是一个数据库文件的客户端不需要同步
                if (clientId == targetClientId // This is the caller itself.
                        || !name.equals(targetName)) { // Not the same file.
                    continue;
                }
                try {
                    //这里涉及到服务端调用客户端的过程, 也是通过aidl
                    IMultiInstanceInvalidationCallback callback =
                            mCallbackList.getBroadcastItem(i);
                    callback.onInvalidation(tables);
                } catch (RemoteException e) {
                    Log.w(Room.LOG_TAG, "Error invoking a remote callback", e);
                }
            }
        } finally {
            mCallbackList.finishBroadcast();
        }
    }
}

遍历时过滤掉不需要同步的客户端

  1. clientId == targetClientId
    发起传播的客户端肯定不需要通知自己
  2. !name.equals(targetName)
    一个APP可以有多个本地db文件, 我们需要同步的是同一个db文件在不同进程中的 RoomDatabase 对象对应的 MultiInstanceInvalidationClient (他的name就是数据库的名字).
    如果 name 不同, 意味着不是同一个db文件的Client, 不需要参与同步.

传播更新

传播更新由 MultiInstanceInvalidationService 调用 MultiInstanceInvalidationClient 中的 Binder 方法来实现, 同样是通过AIDL.

IMultiInstanceInvalidationCallback.aidl 只定义了一个方法 :

interface IMultiInstanceInvalidationCallback {
    oneway void onInvalidation(in String[] tables);
}

MultiInstanceInvalidationClient 中的实现 :

class MultiInstanceInvalidationClient {

    final IMultiInstanceInvalidationCallback mCallback =
            new IMultiInstanceInvalidationCallback.Stub() {
                @Override
                public void onInvalidation(final String[] tables) {
                    mExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            //将数据库中表的更新通知给观察者
                            mInvalidationTracker.notifyObserversByTableNames(tables);
                        }
                    });
                }
            };
}

所以 MultiInstanceInvalidationService # broadcastInvalidation()MultiInstanceInvalidationClient # callback.onInvalidation()mInvalidationTracker.notifyObserversByTableNames(tables), 最终更新会交给每个Client 对应的 InvalidationTracker 处理.

接收更新的MultiInstanceInvalidationClient

InvalidationTracker 中掉用 notifyObserversByTableNames(String... tables) 通知观察者的集合 mObserverMap.

public class InvalidationTracker {

    public void notifyObserversByTableNames(String... tables) {
        synchronized (mObserverMap) {
            //mObserverMap 中保存了当前进程的数据库观察者
            for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
                //!entry.getKey().isRemote() 排除了Client构造方法内的Observer
                if (!entry.getKey().isRemote()) {
                    //这里的tables就是当前进程修改过, 需要同步到别的进程的表
                    entry.getValue().notifyByTableNames(tables);
                }
            }
        }
    }

    static class ObserverWrapper {
        void notifyByTableNames(String[] tables) {
            Set<String> invalidatedTables = null;
            if (mTableNames.length == 1) {
                for (String table : tables) {
                    if (table.equalsIgnoreCase(mTableNames[0])) {
                        // Optimization for a single-table observer
                        invalidatedTables = mSingleTableSet;
                        break;
                    }
                }
            } else {
                ArraySet<String> set = new ArraySet<>();
                for (String table : tables) {  //遍历传播更新的Client修改过的表
                    for (String ourTable : mTableNames) { //遍历当前Client的所有表
                        if (ourTable.equalsIgnoreCase(table)) { //只把需要更新的表加入set集合
                            set.add(ourTable);
                            break;
                        }
                    }
                }
                if (set.size() > 0) {
                    invalidatedTables = set;
                }
            }
            if (invalidatedTables != null) {  //通知观察者更新表
                mObserver.onInvalidated(invalidatedTables);
            }
        }
    }
}

Q: 那么mObserverMap 中的 InvalidationTracker.Observer 在哪里被添加呢?**
A: InvalidationTracker.Observer 就是观察者, 他是一个抽象类, 在 LiveData 和 RxJava 两种观察者模式框架中都提供了他的实现.

我们以 RxJava 为例, 举个例子 :

@Dao
interface NodeModelDao {
    @Query("SELECT * from nodemodel")
    fun getAll(): Flowable<List<NodeModel>>
}

查看 kapt 编译后生成的 NodeModelDaoImpl.java 文件, 可以看到 Room 帮助我们在Callablecall() 方法中生成了查询的Sql操作.

public final class NodeModelDao_Impl implements NodeModelDao {
  @Override
  public Flowable<List<NodeModel>> getAll() {
    final String _sql = "SELECT * from nodemodel";
    final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 0);
    return RxRoom.createFlowable(__db, new String[]{"nodemodel"}, new Callable<List<NodeModel>>() {
      @Override
      public List<NodeModel> call() throws Exception {
        final Cursor _cursor = DBUtil.query(__db, _statement, false);
        try {
            .....
            _item = new NodeModel(_tmpUid,_tmpId,_tmpName,_tmpTitle,_tmpTitleAlternative,_tmpUrl,_tmpTopics,_tmpHeader,_tmpFooter,_tmpIsCollected);
            _result.add(_item);
          return _result;
        } finally {
          _cursor.close();
        }
      }
  }
}

查看 RxRoom.createFlowable 方法, 他的内部会创建一个 InvalidationTracker.Observer 实例 observer, 接着调用 database.getInvalidationTracker().addObserver(observer)observer 添加到 InvalidationTracker 的 mObserverMap 中. observeronInvalidated() 方法会调用 emitter.onNext(NOTHING) 再次发射, 发射后会调用 flatMapMaybe() 转换流为 maybe 返回给观察者, 而 maybe = Maybe.fromCallable(callable), Callable 内的 call() 实现就是上面提到的Sql查询操作.

public class RxRoom {

    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
    public static <T> Flowable<T> createFlowable(final RoomDatabase database,
            final String[] tableNames, final Callable<T> callable) {
        Scheduler scheduler = Schedulers.from(database.getQueryExecutor());
        final Maybe<T> maybe = Maybe.fromCallable(callable);
        return createFlowable(database, tableNames)
                .observeOn(scheduler)
                .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
                    @Override
                    public MaybeSource<T> apply(Object o) throws Exception {
                        return maybe;
                    }
                });
    }

    public static Flowable<Object> createFlowable(final RoomDatabase database,
            final String... tableNames) {
        return Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
                //创建InvalidationTracker.Observer实例
                final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
                        tableNames) {
                    @Override
                    public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
                        if (!emitter.isCancelled()) {
                            //再次发射
                            emitter.onNext(NOTHING);
                        }
                    }
                };
                if (!emitter.isCancelled()) {
                    database.getInvalidationTracker().addObserver(observer);
                    emitter.setDisposable(Disposables.fromAction(new Action() {
                        @Override
                        public void run() throws Exception {
                            database.getInvalidationTracker().removeObserver(observer);
                        }
                    }));
                }

                // emit once to avoid missing any data and also easy chaining
                if (!emitter.isCancelled()) {
                    emitter.onNext(NOTHING);
                }
            }
        }, BackpressureStrategy.LATEST);
    }
}

当别的进程的 Client 修改了数据, Service 会将更新传播到所有进程相同名字的 Client. 每个进程的 Client 对应的 InvalidationTracker 会将需要更新的表 tables 交给内部维护的集合 mObserverMap 处理.

InvalidationTracker.Observer 是个抽象类, 我们使用 LiveData/RxJava 作为 Room 查询的观察者实现时, 会实现 InvalidationTracker.Observer 对象并把它添加到 InvalidationTracker 的 mObserverMap 集合中. 遍历集合并调用 InvalidationTracker.Observer 的 onInvalidated() 实现, 就可以让上游再次发送消息, 再次触发下游观察者的订阅, 到这里更新的流程就结束了.

发起更新的MultiInstanceInvalidationClient

我们已经完成了 Service 通过 Client#Binder 向各个 Client 传播更新的IPC流程, 以及 每个 Client 的 InvalidationTracker 又通过 mObserverMap 通知自己的观察者的流程. 接下来我们只要知道 Service#broadcastInvalidation() 在什么情况下会被调用.

查看Client的构造方法, 这里初始化的 InvalidationTracker.Observer 的 onInvalidated() 中调用了 mService.broadcastInvalidation(). 所以只需要知道这个 observer 的方法什么时候被调用.

MultiInstanceInvalidationClient(Context context, String name,
            InvalidationTracker invalidationTracker, Executor executor) {
        mContext = context.getApplicationContext();
        mName = name;
        mInvalidationTracker = invalidationTracker;
        mExecutor = executor;
        mObserver = new InvalidationTracker.Observer(invalidationTracker.mTableNames) {
            @Override
            public void onInvalidated(@NonNull Set<String> tables) {
                if (mStopped.get()) {
                    return;
                }
                try {
                    mService.broadcastInvalidation(mClientId,
                            tables.toArray(new String[0]));
                } catch (RemoteException e) {
                    Log.w(Room.LOG_TAG, "Cannot broadcast invalidation", e);
                }
            }

            @Override
            boolean isRemote() {
                return true;
            }
        };
        Intent intent = new Intent(mContext, MultiInstanceInvalidationService.class);
        mContext.bindService(intent, mServiceConnection, Context.BIND_AUTO_CREATE);
    }

查看apt编译后生成的XXDao_Impl, 可以看到包括插入,删除,修改等涉及到写的操作都是通过事务 transcation 来完成的. 这很好理解 写入操作要保证原子性.

public final class NodeModelDao_Impl implements NodeModelDao {
  @Override
  public void insertAll(List<NodeModel> userEntities) {
    __db.beginTransaction();
    try {
      __insertionAdapterOfNodeModel.insert(userEntities);
      __db.setTransactionSuccessful();
    } finally {
      __db.endTransaction();
    }
  }

  @Override
  public void deleteAll() {
    final SupportSQLiteStatement _stmt = __preparedStmtOfDeleteAll.acquire();
    __db.beginTransaction();
    try {
      _stmt.executeUpdateDelete();
      __db.setTransactionSuccessful();
    } finally {
      __db.endTransaction();
      __preparedStmtOfDeleteAll.release(_stmt);
    }
  }
}

RoomDatabase # endTransaction() 在事务结束的方法中, 调用了 mInvalidationTracker.refreshVersionsAsync()

public void endTransaction() {
    mOpenHelper.getWritableDatabase().endTransaction();
    if (!inTransaction()) {
        // enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
        // endTransaction call to do it.
        mInvalidationTracker.refreshVersionsAsync();
    }
}

InvalidationTracker # refreshVersionsAsync() , 遍历 mObserverMap, map里包含两种Observer

  • Client 构造方法内用于IPC调用 mService.broadcastInvalidation() 的Observer
  • Client 内做查询操作的观察者的Observer
public void refreshVersionsAsync() {
    // TODO we should consider doing this sync instead of async.
    if (mPendingRefresh.compareAndSet(false, true)) {
        mDatabase.getQueryExecutor().execute(mRefreshRunnable);
    }
}

@VisibleForTesting
Runnable mRefreshRunnable = new Runnable() {
    @Override
    public void run() {
        final Lock closeLock = mDatabase.getCloseLock();
        boolean hasUpdatedTable = false;

        if (hasUpdatedTable) {
            synchronized (mObserverMap) {
                for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
                    entry.getValue().notifyByTableVersions(mTableInvalidStatus);
                }
            }
        }
    }
};

InvalidationTracker.ObserverWrapper # notifyByTableVersions()

void notifyByTableVersions(BitSet tableInvalidStatus) {
    Set<String> invalidatedTables = null;
    final int size = mTableIds.length;
    for (int index = 0; index < size; index++) {
        final int tableId = mTableIds[index];
        if (tableInvalidStatus.get(tableId)) {
            if (size == 1) {
                // Optimization for a single-table observer
                invalidatedTables = mSingleTableSet;
            } else {
                if (invalidatedTables == null) {
                    invalidatedTables = new ArraySet<>(size);
                }
                invalidatedTables.add(mTableNames[index]);
            }
        }
    }
    if (invalidatedTables != null) {
        mObserver.onInvalidated(invalidatedTables);
    }
}

验证

通过 android:process 我们可以最快的模拟跨进程的情况

<activity android:name=".ui.DetailActivity1"/>
<activity android:name=".ui.DetailActivity2"
    android:process=":Process2" />

如图, 当在Process2中对database进行 insert/delete 写入操作时, Process1中的观察者也能响应跨进程的更新, 从而更新UI.


总结

Room 跨进程共享数据使用了两套 C/S 模型, 涉及到两个aidl类. Service 提供 ServiceBinder, 每个 Client 也会提供 ClientBinder. 大概流程 :

  1. 所有 Client 初始化会调用 ServiceBinderregisterCallback() 方法将 ClientBinder 传递给 Service, Service 内维护了 mCallbackList 来保存所有的 ClientBinder .
    当某个 Client 更新时, 会调用 ServiceBinderbroadcastInvalidation() 传播更新.

  2. Service 的 broadcastInvalidation() 会遍历 ClientBinder 的集合 mCallbackList, 将更新交给每个 Client 对应的 InvalidationTracker 处理.

  3. InvalidationTracker 维护了对database数据的观察者集合 mObserverMap, 遍历map调用 observer#onInvalidated() 会触发再次查询的操作, 观察者就可以接收到新的数据.

举报

相关推荐

0 条评论