





public class JobsPagedListProvider {

private JobListDataSource<JobListItemEntity> mJobListDataSource;

public JobsPagedListProvider(JobsRepository jobsRepository) {
    mJobListDataSource = new JobListDataSource<>(jobsRepository);

public LivePagedListProvider<Integer, JobListItemEntity> jobList() {
    return new LivePagedListProvider<Integer, JobListItemEntity>() {
        protected DataSource<Integer, JobListItemEntity> createDataSource() {
            return mJobListDataSource;

public void setQueryFilter(String query) {


public class JobListDataSource<T> extends TiledDataSource<T> {

private final JobsRepository mJobsRepository;
private final InvalidationTracker.Observer mObserver;

String query = "";

public JobListDataSource(JobsRepository jobsRepository) {
    mJobsRepository = jobsRepository;


    mObserver = new InvalidationTracker.Observer(JobListItemEntity.TABLE_NAME) {
        public void onInvalidated(@NonNull Set<String> tables) {


public boolean isInvalid() {
    return super.isInvalid();

public int countItems() {
    return DataSource.COUNT_UNDEFINED;

public List<T> loadRange(int startPosition, int count) {
    return (List<T>) mJobsRepository.getJobs(query, startPosition, count);

public void setQuery(String query) {
    this.query = query;

以下是 JobsRepository 中将作业从未保存更新为已保存的代码:

public void saveJob(JobListItemEntity entity) {
    Completable.fromCallable(() -> {
        JobListItemEntity newJob = new JobListItemEntity(entity);
        newJob.isSaved = true;
        Timber.d("updating entity from " + entity.isSaved + " to "
                + newJob.isSaved); //this gets printed in log
        //insertion in db is happening as expected but UI is not receiving new list
        return null;


private static final DiffCallback<JobListItemEntity> DIFF_CALLBACK =  new DiffCallback<JobListItemEntity>() {
    public boolean areItemsTheSame(@NonNull JobListItemEntity oldItem, @NonNull JobListItemEntity newItem) {
        return oldItem.jobID == newItem.jobID;

    public boolean areContentsTheSame(@NonNull JobListItemEntity oldItem, @NonNull JobListItemEntity newItem) {
        Timber.d(oldItem.isSaved + " comp with" + newItem.isSaved);
        return oldItem.jobID == newItem.jobID
                && oldItem.jobTitle.compareTo(newItem.jobTitle) == 0
                && oldItem.isSaved == newItem.isSaved;


public class JobsRepository {
//holds an instance of datasource
private JobListDataSource mJobListDataSource;

public void setJobListDataSource(JobListDataSource jobListDataSource) {
    mJobListDataSource = jobListDataSource;


JobsRepository 中的 getJobs() :

public List<JobListItemEntity> getJobs(String query, int startPosition, int count) {
    if (!isJobListInit) {

        Observable<JobList> jobListObservable = mApiService.getOpenJobList(
                mRequestJobList.setPageNo(startPosition/count + 1)

        List<JobListItemEntity> jobs = mJobDao.getJobsLimitOffset(count, startPosition);

        //make a synchronous network call since we have no data in db to return
        if(jobs.size() == 0) {
            JobList jobList = jobListObservable.blockingSingle();
            updateJobList(jobList, startPosition);
        } else {
            //make an async call and return cached version meanwhile
            jobListObservable.subscribe(new Observer<JobList>() {
                public void onSubscribe(Disposable d) {


                public void onNext(JobList jobList) {
                    updateJobList(jobList, startPosition);

                public void onError(Throwable e) {

                public void onComplete() {


    return mJobDao.getJobsLimitOffset(count, startPosition);

更新 jobsRepository 中的 JobList:

private void updateJobList(JobList jobList, int startPosition) {
    JobListItemEntity[] jobs = jobList.getJobsData();

阅读 DataSource 的源代码后我意识到:

  1. 数据源一旦失效将永远不会再次有效。
  2. invalidate()说:如果已经调用了 invalidate,则此方法不执行任何操作。

我实际上有一个自定义数据源的单例(JobListDataSource) 由...提供JobsPagedListProvider,所以当我使我的DataSource in saveJob()(定义于JobsRepository),它试图获得新的DataSource实例(通过再次调用 loadRange() 来获取最新数据 - 这就是刷新数据源的工作原理) 但自从我的DataSource是单例并且它已经无效所以没有loadRange()正在查询!



JobsPagedListProvider 中没有单例:

public class JobsPagedListProvider {

private JobListDataSource<JobListItemEntity> mJobListDataSource;

private final JobsRepository mJobsRepository;

public JobsPagedListProvider(JobsRepository jobsRepository) {
    mJobsRepository = jobsRepository;

public LivePagedListProvider<Integer, JobListItemEntity> jobList() {
    return new LivePagedListProvider<Integer, JobListItemEntity>() {
        protected DataSource<Integer, JobListItemEntity> createDataSource() {
            //always return a new instance, because if DataSource gets invalidated a new instance will be required(that's how refreshing a DataSource works)          
            mJobListDataSource = new JobListDataSource<>(mJobsRepository);
            return mJobListDataSource;

public void setQueryFilter(String query) {

还要确保如果您从网络获取数据,则需要有正确的逻辑来在查询网络之前检查数据是否过时,否则每次数据源失效时都会重新查询。 我通过在中添加一个 insertAt 字段解决了这个问题JobEntity它跟踪该项目何时插入数据库并检查它是否过时getJobs() of JobsRepository.

以下是 getJobs() 的代码:

public List<JobListItemEntity> getJobs(String query, int startPosition, int count) {
    Observable<JobList> jobListObservable = mApiService.getOpenJobList(
            mRequestJobList.setPageNo(startPosition / count + 1)

    List<JobListItemEntity> jobs = mJobDao.getJobsLimitOffset(count, startPosition);

    //no data in db, make a synchronous call to network to get the data
    if (jobs.size() == 0) {
        JobList jobList = jobListObservable.blockingSingle();
        updateJobList(jobList, startPosition, false);
    } else if (shouldRefetchJobList(jobs)) {
        //data available in db, so show a cached version and make async network call to update data
        jobListObservable.subscribe(new Observer<JobList>() {
            public void onSubscribe(Disposable d) {


            public void onNext(JobList jobList) {
                updateJobList(jobList, startPosition, true);

            public void onError(Throwable e) {

            public void onComplete() {


    return mJobDao.getJobsLimitOffset(count, startPosition);

最后删除 JobListDatasource 中的 InvalidationTracker,因为我们手动处理失效:

public class JobListDataSource<T> extends TiledDataSource<T> {

private final JobsRepository mJobsRepository;

String query = "";

public JobListDataSource(JobsRepository jobsRepository) {
    mJobsRepository = jobsRepository;

public int countItems() {
    return DataSource.COUNT_UNDEFINED;

public List<T> loadRange(int startPosition, int count) {
    return (List<T>) mJobsRepository.getJobs(query, startPosition, count);

public void setQuery(String query) {
    this.query = query;

