Unverified Commit ce80b76d authored by Viacheslav Dubeyko's avatar Viacheslav Dubeyko Committed by Christian Brauner
Browse files

ceph: introduce ceph_process_folio_batch() method



First step of ceph_writepages_start() logic is
of finding the dirty memory folios and processing it.
This patch introduces ceph_process_folio_batch()
method that moves this logic into dedicated method.

The ceph_writepages_start() has this logic:

if (ceph_wbc.locked_pages == 0)
    lock_page(page);  /* first page */
else if (!trylock_page(page))
    break;

<skipped>

if (folio_test_writeback(folio) ||
    folio_test_private_2(folio) /* [DEPRECATED] */) {
      if (wbc->sync_mode == WB_SYNC_NONE) {
          doutc(cl, "%p under writeback\n", folio);
          folio_unlock(folio);
          continue;
      }
      doutc(cl, "waiting on writeback %p\n", folio);
      folio_wait_writeback(folio);
      folio_wait_private_2(folio); /* [DEPRECATED] */
}

The problem here that folio/page is locked here at first
and it is by set_page_writeback(page) later before
submitting the write request. The folio/page is unlocked
by writepages_finish() after finishing the write
request. It means that logic of checking folio_test_writeback()
and folio_wait_writeback() never works because page is locked
and it cannot be locked again until write request completion.
However, for majority of folios/pages the trylock_page()
is used. As a result, multiple threads can try to lock the same
folios/pages multiple times even if they are under writeback
already. It makes this logic more compute intensive than
it is necessary.

This patch changes this logic:

if (folio_test_writeback(folio) ||
    folio_test_private_2(folio) /* [DEPRECATED] */) {
      if (wbc->sync_mode == WB_SYNC_NONE) {
          doutc(cl, "%p under writeback\n", folio);
          folio_unlock(folio);
          continue;
      }
      doutc(cl, "waiting on writeback %p\n", folio);
      folio_wait_writeback(folio);
      folio_wait_private_2(folio); /* [DEPRECATED] */
}

if (ceph_wbc.locked_pages == 0)
    lock_page(page);  /* first page */
else if (!trylock_page(page))
    break;

This logic should exclude the ignoring of writeback
state of folios/pages.

Signed-off-by: default avatarViacheslav Dubeyko <Slava.Dubeyko@ibm.com>
Link: https://lore.kernel.org/r/20250205000249.123054-3-slava@dubeyko.com


Tested-by: default avatarDavid Howells <dhowells@redhat.com>
Signed-off-by: default avatarChristian Brauner <brauner@kernel.org>
parent f08068df
Loading
Loading
Loading
Loading
+365 −203
Original line number Diff line number Diff line
@@ -978,6 +978,27 @@ static void writepages_finish(struct ceph_osd_request *req)
	ceph_dec_osd_stopping_blocker(fsc->mdsc);
}

static inline
bool is_forced_umount(struct address_space *mapping)
{
	struct inode *inode = mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
	struct ceph_client *cl = fsc->client;

	if (ceph_inode_is_shutdown(inode)) {
		if (ci->i_wrbuffer_ref > 0) {
			pr_warn_ratelimited_client(cl,
				"%llx.%llx %lld forced umount\n",
				ceph_vinop(inode), ceph_ino(inode));
		}
		mapping_set_error(mapping, -EIO);
		return true;
	}

	return false;
}

static inline
unsigned int ceph_define_write_size(struct address_space *mapping)
{
@@ -1046,261 +1067,402 @@ void ceph_init_writeback_ctl(struct address_space *mapping,
	ceph_wbc->data_pages = NULL;
}

/*
 * initiate async writeback
 */
static int ceph_writepages_start(struct address_space *mapping,
				 struct writeback_control *wbc)
static inline
int ceph_define_writeback_range(struct address_space *mapping,
				struct writeback_control *wbc,
				struct ceph_writeback_ctl *ceph_wbc)
{
	struct inode *inode = mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
	struct ceph_client *cl = fsc->client;
	struct ceph_vino vino = ceph_vino(inode);
	struct ceph_snap_context *pgsnapc;
	struct ceph_writeback_ctl ceph_wbc;
	struct ceph_osd_request *req = NULL;
	int rc = 0;
	bool caching = ceph_is_cache_enabled(inode);

	if (wbc->sync_mode == WB_SYNC_NONE &&
	    fsc->write_congested)
		return 0;

	doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode),
	      wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
	      (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));

	if (ceph_inode_is_shutdown(inode)) {
		if (ci->i_wrbuffer_ref > 0) {
			pr_warn_ratelimited_client(cl,
				"%llx.%llx %lld forced umount\n",
				ceph_vinop(inode), ceph_ino(inode));
		}
		mapping_set_error(mapping, -EIO);
		return -EIO; /* we're in a forced umount, don't write! */
	}

	ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc);

retry:
	/* find oldest snap context with dirty data */
	ceph_wbc.snapc = get_oldest_context(inode, &ceph_wbc, NULL);
	if (!ceph_wbc.snapc) {
	ceph_wbc->snapc = get_oldest_context(inode, ceph_wbc, NULL);
	if (!ceph_wbc->snapc) {
		/* hmm, why does writepages get called when there
		   is no dirty data? */
		doutc(cl, " no snap context with dirty data?\n");
		goto out;
		return -ENODATA;
	}

	doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n",
	      ceph_wbc.snapc, ceph_wbc.snapc->seq,
	      ceph_wbc.snapc->num_snaps);
	      ceph_wbc->snapc, ceph_wbc->snapc->seq,
	      ceph_wbc->snapc->num_snaps);

	ceph_wbc->should_loop = false;

	ceph_wbc.should_loop = false;
	if (ceph_wbc.head_snapc && ceph_wbc.snapc != ceph_wbc.last_snapc) {
	if (ceph_wbc->head_snapc && ceph_wbc->snapc != ceph_wbc->last_snapc) {
		/* where to start/end? */
		if (wbc->range_cyclic) {
			ceph_wbc.index = ceph_wbc.start_index;
			ceph_wbc.end = -1;
			if (ceph_wbc.index > 0)
				ceph_wbc.should_loop = true;
			doutc(cl, " cyclic, start at %lu\n", ceph_wbc.index);
			ceph_wbc->index = ceph_wbc->start_index;
			ceph_wbc->end = -1;
			if (ceph_wbc->index > 0)
				ceph_wbc->should_loop = true;
			doutc(cl, " cyclic, start at %lu\n", ceph_wbc->index);
		} else {
			ceph_wbc.index = wbc->range_start >> PAGE_SHIFT;
			ceph_wbc.end = wbc->range_end >> PAGE_SHIFT;
			ceph_wbc->index = wbc->range_start >> PAGE_SHIFT;
			ceph_wbc->end = wbc->range_end >> PAGE_SHIFT;
			if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
				ceph_wbc.range_whole = true;
				ceph_wbc->range_whole = true;
			doutc(cl, " not cyclic, %lu to %lu\n",
			      ceph_wbc.index, ceph_wbc.end);
				ceph_wbc->index, ceph_wbc->end);
		}
	} else if (!ceph_wbc.head_snapc) {
	} else if (!ceph_wbc->head_snapc) {
		/* Do not respect wbc->range_{start,end}. Dirty pages
		 * in that range can be associated with newer snapc.
		 * They are not writeable until we write all dirty pages
		 * associated with 'snapc' get written */
		if (ceph_wbc.index > 0)
			ceph_wbc.should_loop = true;
		if (ceph_wbc->index > 0)
			ceph_wbc->should_loop = true;
		doutc(cl, " non-head snapc, range whole\n");
	}

	if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages)
		tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end);

	ceph_put_snap_context(ceph_wbc.last_snapc);
	ceph_wbc.last_snapc = ceph_wbc.snapc;
	ceph_put_snap_context(ceph_wbc->last_snapc);
	ceph_wbc->last_snapc = ceph_wbc->snapc;

	while (!ceph_wbc.done && ceph_wbc.index <= ceph_wbc.end) {
		unsigned i;
		struct page *page;
	return 0;
}

		ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT;
static inline
bool has_writeback_done(struct ceph_writeback_ctl *ceph_wbc)
{
	return ceph_wbc->done && ceph_wbc->index > ceph_wbc->end;
}

get_more_pages:
		ceph_wbc.nr_folios = filemap_get_folios_tag(mapping,
							    &ceph_wbc.index,
							    ceph_wbc.end,
							    ceph_wbc.tag,
							    &ceph_wbc.fbatch);
		doutc(cl, "pagevec_lookup_range_tag got %d\n",
			ceph_wbc.nr_folios);
		if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages)
			break;
		for (i = 0; i < ceph_wbc.nr_folios &&
			    ceph_wbc.locked_pages < ceph_wbc.max_pages; i++) {
			struct folio *folio = ceph_wbc.fbatch.folios[i];
static inline
bool can_next_page_be_processed(struct ceph_writeback_ctl *ceph_wbc,
				unsigned index)
{
	return index < ceph_wbc->nr_folios &&
		ceph_wbc->locked_pages < ceph_wbc->max_pages;
}

			page = &folio->page;
			doutc(cl, "? %p idx %lu\n", page, page->index);
			if (ceph_wbc.locked_pages == 0)
				lock_page(page);  /* first page */
			else if (!trylock_page(page))
				break;
static
int ceph_check_page_before_write(struct address_space *mapping,
				 struct writeback_control *wbc,
				 struct ceph_writeback_ctl *ceph_wbc,
				 struct folio *folio)
{
	struct inode *inode = mapping->host;
	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
	struct ceph_client *cl = fsc->client;
	struct ceph_snap_context *pgsnapc;
	struct page *page = &folio->page;

	/* only dirty pages, or our accounting breaks */
			if (unlikely(!PageDirty(page)) ||
			    unlikely(page->mapping != mapping)) {
	if (unlikely(!PageDirty(page)) || unlikely(page->mapping != mapping)) {
		doutc(cl, "!dirty or !mapping %p\n", page);
				unlock_page(page);
				continue;
		return -ENODATA;
	}

	/* only if matching snap context */
	pgsnapc = page_snap_context(page);
			if (pgsnapc != ceph_wbc.snapc) {
	if (pgsnapc != ceph_wbc->snapc) {
		doutc(cl, "page snapc %p %lld != oldest %p %lld\n",
		      pgsnapc, pgsnapc->seq,
				      ceph_wbc.snapc, ceph_wbc.snapc->seq);
				if (!ceph_wbc.should_loop &&
				    !ceph_wbc.head_snapc &&
		      ceph_wbc->snapc, ceph_wbc->snapc->seq);

		if (!ceph_wbc->should_loop && !ceph_wbc->head_snapc &&
		    wbc->sync_mode != WB_SYNC_NONE)
					ceph_wbc.should_loop = true;
				unlock_page(page);
				continue;
			ceph_wbc->should_loop = true;

		return -ENODATA;
	}
			if (page_offset(page) >= ceph_wbc.i_size) {

	if (page_offset(page) >= ceph_wbc->i_size) {
		doutc(cl, "folio at %lu beyond eof %llu\n",
				      folio->index, ceph_wbc.i_size);
				if ((ceph_wbc.size_stable ||
		      folio->index, ceph_wbc->i_size);

		if ((ceph_wbc->size_stable ||
		    folio_pos(folio) >= i_size_read(inode)) &&
		    folio_clear_dirty_for_io(folio))
					folio_invalidate(folio, 0,
							folio_size(folio));
				folio_unlock(folio);
				continue;
			folio_invalidate(folio, 0, folio_size(folio));

		return -ENODATA;
	}
			if (ceph_wbc.strip_unit_end &&
			    (page->index > ceph_wbc.strip_unit_end)) {

	if (ceph_wbc->strip_unit_end &&
	    (page->index > ceph_wbc->strip_unit_end)) {
		doutc(cl, "end of strip unit %p\n", page);
				unlock_page(page);
				break;
		return -E2BIG;
	}

	return 0;
}

static inline
void __ceph_allocate_page_array(struct ceph_writeback_ctl *ceph_wbc,
				unsigned int max_pages)
{
	ceph_wbc->pages = kmalloc_array(max_pages,
					sizeof(*ceph_wbc->pages),
					GFP_NOFS);
	if (!ceph_wbc->pages) {
		ceph_wbc->from_pool = true;
		ceph_wbc->pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
		BUG_ON(!ceph_wbc->pages);
	}
}

static inline
void ceph_allocate_page_array(struct address_space *mapping,
			      struct ceph_writeback_ctl *ceph_wbc,
			      struct page *page)
{
	struct inode *inode = mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
	u64 objnum;
	u64 objoff;
	u32 xlen;

	/* prepare async write request */
	ceph_wbc->offset = (u64)page_offset(page);
	ceph_calc_file_object_mapping(&ci->i_layout,
					ceph_wbc->offset, ceph_wbc->wsize,
					&objnum, &objoff, &xlen);

	ceph_wbc->num_ops = 1;
	ceph_wbc->strip_unit_end = page->index + ((xlen - 1) >> PAGE_SHIFT);

	BUG_ON(ceph_wbc->pages);
	ceph_wbc->max_pages = calc_pages_for(0, (u64)xlen);
	__ceph_allocate_page_array(ceph_wbc, ceph_wbc->max_pages);

	ceph_wbc->len = 0;
}

static inline
bool is_page_index_contiguous(struct ceph_writeback_ctl *ceph_wbc,
			      struct page *page)
{
	return page->index == (ceph_wbc->offset + ceph_wbc->len) >> PAGE_SHIFT;
}

static inline
bool is_num_ops_too_big(struct ceph_writeback_ctl *ceph_wbc)
{
	return ceph_wbc->num_ops >=
		(ceph_wbc->from_pool ?  CEPH_OSD_SLAB_OPS : CEPH_OSD_MAX_OPS);
}

static inline
bool is_write_congestion_happened(struct ceph_fs_client *fsc)
{
	return atomic_long_inc_return(&fsc->writeback_count) >
		CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb);
}

static inline
int ceph_move_dirty_page_in_page_array(struct address_space *mapping,
					struct writeback_control *wbc,
					struct ceph_writeback_ctl *ceph_wbc,
					struct page *page)
{
	struct inode *inode = mapping->host;
	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
	struct ceph_client *cl = fsc->client;
	struct page **pages = ceph_wbc->pages;
	unsigned int index = ceph_wbc->locked_pages;
	gfp_t gfp_flags = ceph_wbc->locked_pages ? GFP_NOWAIT : GFP_NOFS;

	if (IS_ENCRYPTED(inode)) {
		pages[index] = fscrypt_encrypt_pagecache_blocks(page,
								PAGE_SIZE,
								0,
								gfp_flags);
		if (IS_ERR(pages[index])) {
			if (PTR_ERR(pages[index]) == -EINVAL) {
				pr_err_client(cl, "inode->i_blkbits=%hhu\n",
						inode->i_blkbits);
			}

			/* better not fail on first page! */
			BUG_ON(ceph_wbc->locked_pages == 0);

			pages[index] = NULL;
			return PTR_ERR(pages[index]);
		}
	} else {
		pages[index] = page;
	}

	ceph_wbc->locked_pages++;

	return 0;
}

static
int ceph_process_folio_batch(struct address_space *mapping,
			     struct writeback_control *wbc,
			     struct ceph_writeback_ctl *ceph_wbc)
{
	struct inode *inode = mapping->host;
	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
	struct ceph_client *cl = fsc->client;
	struct folio *folio = NULL;
	struct page *page = NULL;
	unsigned i;
	int rc = 0;

	for (i = 0; can_next_page_be_processed(ceph_wbc, i); i++) {
		folio = ceph_wbc->fbatch.folios[i];

		if (!folio)
			continue;

		page = &folio->page;

		doutc(cl, "? %p idx %lu, folio_test_writeback %#x, "
			"folio_test_dirty %#x, folio_test_locked %#x\n",
			page, page->index, folio_test_writeback(folio),
			folio_test_dirty(folio),
			folio_test_locked(folio));

		if (folio_test_writeback(folio) ||
		    folio_test_private_2(folio) /* [DEPRECATED] */) {
				if (wbc->sync_mode == WB_SYNC_NONE) {
					doutc(cl, "%p under writeback\n", folio);
					folio_unlock(folio);
					continue;
				}
			doutc(cl, "waiting on writeback %p\n", folio);
			folio_wait_writeback(folio);
			folio_wait_private_2(folio); /* [DEPRECATED] */
			continue;
		}

		if (ceph_wbc->locked_pages == 0)
			lock_page(page);  /* first page */
		else if (!trylock_page(page))
			break;

		rc = ceph_check_page_before_write(mapping, wbc,
						  ceph_wbc, folio);
		if (rc == -ENODATA) {
			rc = 0;
			unlock_page(page);
			ceph_wbc->fbatch.folios[i] = NULL;
			continue;
		} else if (rc == -E2BIG) {
			rc = 0;
			unlock_page(page);
			ceph_wbc->fbatch.folios[i] = NULL;
			break;
		}

		if (!clear_page_dirty_for_io(page)) {
			doutc(cl, "%p !clear_page_dirty_for_io\n", page);
			unlock_page(page);
			ceph_wbc->fbatch.folios[i] = NULL;
			continue;
		}

		/*
		 * We have something to write.  If this is
		 * the first locked page this time through,
			 * calculate max possinle write size and
		 * calculate max possible write size and
		 * allocate a page array
		 */
			if (ceph_wbc.locked_pages == 0) {
				u64 objnum;
				u64 objoff;
				u32 xlen;

				/* prepare async write request */
				ceph_wbc.offset = (u64)page_offset(page);
				ceph_calc_file_object_mapping(&ci->i_layout,
							      ceph_wbc.offset,
							      ceph_wbc.wsize,
							      &objnum, &objoff,
							      &xlen);
				ceph_wbc.len = xlen;

				ceph_wbc.num_ops = 1;
				ceph_wbc.strip_unit_end = page->index +
					((ceph_wbc.len - 1) >> PAGE_SHIFT);

				BUG_ON(ceph_wbc.pages);
				ceph_wbc.max_pages =
					calc_pages_for(0, (u64)ceph_wbc.len);
				ceph_wbc.pages = kmalloc_array(ceph_wbc.max_pages,
						      sizeof(*ceph_wbc.pages),
						      GFP_NOFS);
				if (!ceph_wbc.pages) {
					ceph_wbc.from_pool = true;
					ceph_wbc.pages =
						mempool_alloc(ceph_wb_pagevec_pool,
								GFP_NOFS);
					BUG_ON(!ceph_wbc.pages);
				}

				ceph_wbc.len = 0;
			} else if (page->index !=
				   (ceph_wbc.offset + ceph_wbc.len) >> PAGE_SHIFT) {
				if (ceph_wbc.num_ops >=
				    (ceph_wbc.from_pool ?  CEPH_OSD_SLAB_OPS :
							     CEPH_OSD_MAX_OPS)) {
		if (ceph_wbc->locked_pages == 0) {
			ceph_allocate_page_array(mapping, ceph_wbc, page);
		} else if (!is_page_index_contiguous(ceph_wbc, page)) {
			if (is_num_ops_too_big(ceph_wbc)) {
				redirty_page_for_writepage(wbc, page);
				unlock_page(page);
				break;
			}

				ceph_wbc.num_ops++;
				ceph_wbc.offset = (u64)page_offset(page);
				ceph_wbc.len = 0;
			ceph_wbc->num_ops++;
			ceph_wbc->offset = (u64)page_offset(page);
			ceph_wbc->len = 0;
		}

		/* note position of first page in fbatch */
		doutc(cl, "%llx.%llx will write page %p idx %lu\n",
		      ceph_vinop(inode), page, page->index);

			if (atomic_long_inc_return(&fsc->writeback_count) >
			    CONGESTION_ON_THRESH(
				    fsc->mount_options->congestion_kb))
				fsc->write_congested = true;
		fsc->write_congested = is_write_congestion_happened(fsc);

			if (IS_ENCRYPTED(inode)) {
				ceph_wbc.pages[ceph_wbc.locked_pages] =
					fscrypt_encrypt_pagecache_blocks(page,
						PAGE_SIZE, 0,
						ceph_wbc.locked_pages ?
							GFP_NOWAIT : GFP_NOFS);
				if (IS_ERR(ceph_wbc.pages[ceph_wbc.locked_pages])) {
					if (PTR_ERR(ceph_wbc.pages[ceph_wbc.locked_pages]) == -EINVAL)
						pr_err_client(cl,
							"inode->i_blkbits=%hhu\n",
							inode->i_blkbits);
					/* better not fail on first page! */
					BUG_ON(ceph_wbc.locked_pages == 0);
					ceph_wbc.pages[ceph_wbc.locked_pages] = NULL;
		rc = ceph_move_dirty_page_in_page_array(mapping, wbc,
							ceph_wbc, page);
		if (rc) {
			redirty_page_for_writepage(wbc, page);
			unlock_page(page);
			break;
		}
				++ceph_wbc.locked_pages;
			} else {
				ceph_wbc.pages[ceph_wbc.locked_pages++] = page;

		ceph_wbc->fbatch.folios[i] = NULL;
		ceph_wbc->len += thp_size(page);
	}

			ceph_wbc.fbatch.folios[i] = NULL;
			ceph_wbc.len += thp_size(page);
	ceph_wbc->processed_in_fbatch = i;

	return rc;
}

/*
 * initiate async writeback
 */
static int ceph_writepages_start(struct address_space *mapping,
				 struct writeback_control *wbc)
{
	struct inode *inode = mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
	struct ceph_client *cl = fsc->client;
	struct ceph_vino vino = ceph_vino(inode);
	struct ceph_writeback_ctl ceph_wbc;
	struct ceph_osd_request *req = NULL;
	int rc = 0;
	bool caching = ceph_is_cache_enabled(inode);

	if (wbc->sync_mode == WB_SYNC_NONE &&
	    fsc->write_congested)
		return 0;

	doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode),
	      wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
	      (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));

	if (is_forced_umount(mapping)) {
		/* we're in a forced umount, don't write! */
		return -EIO;
	}

	ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc);

retry:
	rc = ceph_define_writeback_range(mapping, wbc, &ceph_wbc);
	if (rc == -ENODATA) {
		/* hmm, why does writepages get called when there
		   is no dirty data? */
		rc = 0;
		goto out;
	}

	if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages)
		tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end);

	while (!has_writeback_done(&ceph_wbc)) {
		unsigned i;
		struct page *page;

		ceph_wbc.locked_pages = 0;
		ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT;

get_more_pages:
		ceph_folio_batch_reinit(&ceph_wbc);

		ceph_wbc.nr_folios = filemap_get_folios_tag(mapping,
							    &ceph_wbc.index,
							    ceph_wbc.end,
							    ceph_wbc.tag,
							    &ceph_wbc.fbatch);
		doutc(cl, "pagevec_lookup_range_tag for tag %#x got %d\n",
			ceph_wbc.tag, ceph_wbc.nr_folios);

		if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages)
			break;

		rc = ceph_process_folio_batch(mapping, wbc, &ceph_wbc);
		if (rc)
			goto release_folios;

		/* did we get anything? */
		if (!ceph_wbc.locked_pages)
			goto release_folios;