/*-
 * See the file LICENSE for redistribution information.
 *
 * Copyright (c) 1999
 *	Sleepycat Software.  All rights reserved.
 */

#include "db_config.h"

#ifndef lint
static const char sccsid[] = "@(#)qam.c	11.23 (Sleepycat) 10/26/99";
#endif /* not lint */

#ifndef NO_SYSTEM_INCLUDES
#include <sys/types.h>

#include <errno.h>
#include <string.h>
#endif

#include "db_int.h"
#include "db_page.h"
#include "db_shash.h"
#include "db_am.h"
#include "lock.h"
#include "qam.h"
#include "mp.h"

static int CDB___qam_c_close __P((DBC *));
static int CDB___qam_c_del __P((DBC *, u_int32_t));
static int CDB___qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t));
static int CDB___qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t));
static int CDB___qam_getno __P((DB *, const DBT *, db_recno_t *));
static int CDB___qam_i_delete __P((DBC *));
static int CDB___qam_i_put __P((DBC *, DBT *, u_int32_t));
static int CDB___qam_nrecs __P((DBC *, db_recno_t *, db_recno_t *));
static int CDB___qam_position
	       __P((DBC *, db_recno_t *, db_lockmode_t, db_recno_t, int *));
static int CDB___qam_c_destroy __P((DBC *));

/*
 * CDB___qam_nrecs --
 *	Return the record number for the head and tail of the queue.
 */
static int
CDB___qam_nrecs(dbc, rep, start)
	DBC *dbc;
	db_recno_t *rep, *start;
{
	DB *dbp;
	DB_LOCK lock;
	QMETA *meta;
	db_pgno_t pg;
	int ret;

	dbp = dbc->dbp;

	pg = ((QUEUE *)dbp->q_internal)->q_meta;
	if ((ret = CDB___db_lget(dbc, 0, pg,  DB_LOCK_READ, 0, &lock)) != 0)
		return (ret);
	if ((ret = CDB_memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
		/* We did not fetch it, we can release the lock. */
		(void)__LPUT(dbc, lock);
		return (ret);
	}

	*rep = meta->cur_recno;
	*start = meta->start;

	if ((ret = CDB_memp_fput(dbp->mpf, meta, 0)) != 0)
		return (ret);
	/* Don't hold the meta page long term. */
	if ((ret = __LPUT(dbc, lock)) != 0)
		return (ret);

	return (0);
}

/*
 * CDB___qam_position --
 *	Position a queued access method cursor at a record.  This returns
 *	the page locked.  *exactp will be set if the record is valid.
 */
static int
CDB___qam_position(dbc, recnop, lock_mode, start, exactp)
	DBC *dbc;		/* open cursor */
	db_recno_t *recnop;	/* pointer to recno to find */
	db_lockmode_t lock_mode;/* locking: read or write */
	db_recno_t  start;	/* meta.start */
	int *exactp;		/* indicate if it was found */
{
	QUEUE_CURSOR *cp;
	DB *dbp;
	QAMDATA  *qp;
	db_pgno_t pg;
	int ret;

	dbp = dbc->dbp;
	cp = (QUEUE_CURSOR *) dbc->internal;

	/* Fetch the page for this recno. */
	pg = QAM_RECNO_PAGE(dbp, start, *recnop);

	if ((ret = CDB___db_lget(dbc, 0, pg,  lock_mode, 0, &cp->lock)) != 0)
		return (ret);
	if ((ret = CDB_memp_fget(dbp->mpf, &pg,
	    lock_mode == DB_LOCK_WRITE ? DB_MPOOL_CREATE : 0,
	    &cp->page)) != 0) {
		/* We did not fetch it, we can release the lock. */
		(void)__LPUT(dbc, cp->lock);
		cp->lock.off = LOCK_INVALID;
		return (ret);
	}
	cp->pgno = pg;
	cp->indx = QAM_RECNO_INDEX(dbp, pg, start, *recnop);

	if (cp->page->pgno == 0) {
		if (F_ISSET(dbp, DB_AM_RDONLY)) {
			*exactp = 0;
			return (0);
		}
		cp->page->pgno = pg;
		cp->page->type = P_QAMDATA;
	}

	qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
	*exactp = F_ISSET(qp, QAM_VALID);

	return (ret);
}

/*
 * CDB___qam_pitem --
 *	Put an item on a queue page.  Copy the data to the page and set the
 *	VALID and SET bits.  If logging and the record was previously set,
 *	log that data, otherwise just log the new data.
 *
 *   pagep must be write locked
 *
 * PUBLIC: int CDB___qam_pitem
 * PUBLIC:     __P((DBC *,  QPAGE *, u_int32_t, db_recno_t, DBT *));
 */
int
CDB___qam_pitem(dbc, pagep, indx, recno, data)
	DBC *dbc;
	QPAGE *pagep;
	u_int32_t indx;
	db_recno_t recno;
	DBT *data;
{
	DB *dbp;
	DBT olddata, pdata, *datap;
	QAMDATA *qp;
	QUEUE *t;
	u_int32_t size;
	u_int8_t *dest, *p;
	int alloced, ret;

	alloced = ret = 0;

	dbp = dbc->dbp;
	t = (QUEUE *)dbp->q_internal;

	if (data->size > t->re_len)
		return (EINVAL);

	qp = QAM_GET_RECORD(dbp, pagep, indx);

	p = qp->data;
	size = data->size;
	datap = data;
	if (F_ISSET(data, DB_DBT_PARTIAL)) {
		if (data->size != data->dlen)
			return (EINVAL);
		if (data->doff + data->dlen > t->re_len)
			return (EINVAL);
		if (data->size == t->re_len)
			goto no_partial;

		/*
		 * If we are logging, then we have to build the record
		 * first, otherwise, we can simply drop the change
		 * directly on the page.  After this clause, make
		 * sure that datap and p are set up correctly so that
		 * copying datap into p does the right thing.
		 *
		 * Note, I am changing this so that if the existing
		 * record is not valid, we create a complete record
		 * to log so that both this and the recovery code is simpler.
		 */

		if (DB_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
			datap = &pdata;
			memset(datap, 0, sizeof(*datap));

			if ((ret =
			    CDB___os_malloc(t->re_len, NULL, &datap->data)) != 0)
				return (ret);
			alloced = 1;
			datap->size = t->re_len;

			/*
			 * Construct the record if it's valid, otherwise set it
			 * all to the pad character.
			 */
			dest = datap->data;
			if (F_ISSET(qp, QAM_VALID))
				memcpy(dest, p, t->re_len);
			else
				memset(dest, t->re_pad, t->re_len);

			dest += data->doff;
			memcpy(dest, data->data, data->size);
		} else {
			datap = data;
			p += data->doff;
		}
	}

no_partial:
	if (DB_LOGGING(dbc)) {
		olddata.size = 0;
		if (F_ISSET(qp, QAM_SET)) {
			olddata.data = qp->data;
			olddata.size = t->re_len;
		}
		if ((ret = CDB___qam_add_log(dbp->dbenv, dbc->txn, &LSN(pagep),
		    0, dbp->log_fileid, &LSN(pagep), pagep->pgno,
		    indx, recno, datap, qp->flags,
		    olddata.size == 0 ? NULL : &olddata)) != 0)
			goto err;
	}

	F_SET(qp, QAM_VALID | QAM_SET);
	memcpy(p, datap->data, datap->size);
	if (!F_ISSET(data, DB_DBT_PARTIAL))
		memset(p + datap->size,  t->re_pad, t->re_len - datap->size);

err:	if (alloced)
		CDB___os_free(datap->data, t->re_len);

	return (ret);
}
/*
 * CDB___qam_c_put
 * 	Cursor put for queued access method.
 *	BEFORE and AFTER cannot be specified.
 */
static int
CDB___qam_c_put(dbc, key, data, flags)
	DBC *dbc;
	DBT *key, *data;
	u_int32_t flags;
{
	QUEUE_CURSOR *cp;
	DB_LOCK lock;
	int ret;

	/* Check for invalid flags. */
	if ((ret = CDB___db_cputchk(dbc->dbp, key, data, flags,
	    F_ISSET(dbc->dbp, DB_AM_RDONLY),
	    ((QUEUE_CURSOR *)dbc->internal)->recno != RECNO_OOB)) != 0)
		return (ret);

	DEBUG_LWRITE(dbc, dbc->txn, "qam_c_put", NULL, data, flags);

	cp = (QUEUE_CURSOR *)dbc->internal;
	lock = cp->lock;

	ret = CDB___qam_i_put(dbc, data, flags);

	/* Release any previous lock, if not in a transaction. */
	if (ret == 0 && lock.off != LOCK_INVALID) {
		if ((ret = __TLPUT(dbc, lock)) != 0)
			return (ret);
		cp->lock.off = LOCK_INVALID;
	}
	return (0);
}

/*
 * CDB___qam_i_put
 * 	Internal cursor put for queued access method.
 */
static int
CDB___qam_i_put(dbc, data, flags)
	DBC *dbc;
	DBT *data;
	u_int32_t flags;
{
	QUEUE_CURSOR *cp;
	DB *dbp;
	DB_LOCK lock;
	QMETA *meta;
	db_pgno_t pg;
	db_recno_t new_cur, new_first;
	u_int32_t opcode;
	int exact, ret, t_ret;

	dbp = dbc->dbp;
	PANIC_CHECK(dbp->dbenv);
	/*
	 * If we are running CDB, this had better be either a write
	 * cursor or an immediate writer.  If it's a regular writer,
	 * that means we have an IWRITE lock and we need to upgrade
	 * it to a write lock.
	 */
	if (F_ISSET(dbp->dbenv, DB_ENV_CDB)) {
		if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER))
			return (EINVAL);

		if (F_ISSET(dbc, DBC_WRITECURSOR) &&
		    (ret = CDB_lock_get(dbp->dbenv, dbc->locker, DB_LOCK_UPGRADE,
		    &dbc->lock_dbt, DB_LOCK_WRITE, &dbc->mylock)) != 0)
			return (ret);
	}

	cp = (QUEUE_CURSOR *)dbc->internal;

	/* Write lock the record. */
	if ((ret = CDB___db_lget(dbc,
	    0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
		return (ret);

	if ((ret = CDB___qam_position(dbc,
	    &cp->recno, DB_LOCK_WRITE, cp->start, &exact)) != 0) {
		/* We could not get the page, we can release the record lock. */
		__LPUT(dbc, lock);
		return (ret);
	}

	if (exact && flags == DB_NOOVERWRITE) {
		ret = __TLPUT(dbc, lock);
		/* Doing record locking, release the page lock */
		if ((t_ret = __LPUT(dbc, cp->lock)) == 0)
			cp->lock.off = LOCK_INVALID;
		else
			if (ret == 0)
				ret = t_ret;
		if ((t_ret = CDB_memp_fput(dbp->mpf, cp->page, 0)) != 0 && ret == 0)
			ret = t_ret;
		return (ret == 0 ? DB_KEYEXIST : ret);
	}

	/* Put the item on the page. */
	ret = CDB___qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data);

	/* Doing record locking, release the page lock */
	if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
		ret = t_ret;
	if ((t_ret = CDB_memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY)) && ret == 0)
		ret = t_ret;
	cp->lock = lock;
	cp->lock_mode = DB_LOCK_WRITE;
	if (ret != 0)
		return (ret);

	/* We may need to reset the head or tail of the queue. */
	pg = ((QUEUE *)dbp->q_internal)->q_meta;
	if ((ret = CDB___db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
		return (ret);
	if ((ret = CDB_memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
		/* We did not fetch it, we can release the lock. */
		(void)__LPUT(dbc, lock);
		return (ret);
	}

	opcode = 0;
	new_cur = new_first = 0;

	if (cp->recno > meta->cur_recno) {
		new_cur = cp->recno;
		opcode |= QAM_SETCUR;
	}
	if (cp->recno < meta->first_recno || meta->first_recno < meta->start) {
		new_first = cp->recno;
		opcode |= QAM_SETFIRST;
	}

	if (opcode != 0 && DB_LOGGING(dbc)) {
		ret = CDB___qam_mvptr_log(dbp->dbenv, dbc->txn, &meta->dbmeta.lsn,
		    0, opcode, dbp->log_fileid, meta->first_recno, new_first,
		    meta->cur_recno, new_cur, &meta->dbmeta.lsn);
	}

	if (opcode & QAM_SETCUR)
		meta->cur_recno = cp->recno;
	if (opcode & QAM_SETFIRST)
		meta->first_recno = cp->recno;

	if ((t_ret =
	    CDB_memp_fput(dbp->mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 &&
	    ret == 0)
		ret = t_ret;

	/* Don't hold the meta page long term. */
	if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
		ret = t_ret;
	return (ret);
}

/*
 * CDB___qam_put --
 * 	Add a record to the queue.
 *	If we are doing anything but appending, just call qam_c_put to do the
 *	work.  Otherwise we fast path things here.
 *
 * PUBLIC: int CDB___qam_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
 */
int
CDB___qam_put(dbp, txn, key, data, flags)
	DB *dbp;
	DB_TXN *txn;
	DBT *key, *data;
	u_int32_t flags;
{
	QUEUE_CURSOR *cp;
	DBC *dbc;
	DB_LOCK lock;
	QMETA *meta;
	QPAGE *page;
	db_pgno_t pg;
	db_recno_t recno, start, total;
	int ret, t_ret;

	PANIC_CHECK(dbp->dbenv);

	/* Allocate a cursor. */
	if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
		return (ret);

	DEBUG_LWRITE(dbc, txn, "qam_put", key, data, flags);

	cp = dbc->internal;

	/* Check for invalid flags. */
	if ((ret = CDB___db_putchk(dbp,
	    key, data, flags, F_ISSET(dbp, DB_AM_RDONLY), 0)) != 0)
		goto done;

	/* If not appending, then just call the cursor routine */
	if (flags != DB_APPEND) {
		if ((ret = CDB___qam_getno(dbp, key, &cp->recno)) != 0)
			goto done;
		CDB___qam_nrecs(dbc, &total, &cp->start);

		ret = CDB___qam_i_put(dbc, data, flags);
		goto done;
	}

	/* Write lock the meta page. */
	pg = ((QUEUE *)dbp->q_internal)->q_meta;
	if ((ret = CDB___db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
		goto done;
	if ((ret = CDB_memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
		/* We did not fetch it, we can release the lock. */
		(void)__LPUT(dbc, lock);
		goto done;
	}

	/* Record that we are going to allocate a record. */
	if (DB_LOGGING(dbc)) {
		CDB___qam_inc_log(dbp->dbenv,
		    txn, &meta->dbmeta.lsn,
		    0, dbp->log_fileid, &meta->dbmeta.lsn);
	}

	/* Get the next record number. */
	recno = ++meta->cur_recno;
	start = meta->start;

	if (meta->first_recno < meta->start || meta->first_recno > recno)
		meta->first_recno = recno;

	/* Release the meta page. */
	if ((ret = CDB_memp_fput(dbp->mpf, meta, DB_MPOOL_DIRTY)) != 0)
		return (ret);

	/* Lock the record and release meta page lock. */
	if ((ret = CDB___db_lget(dbc,
	    1, recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
		goto done;

	cp->lock = lock;
	cp->lock_mode = DB_LOCK_WRITE;

	pg = QAM_RECNO_PAGE(dbp, start, recno);

	/* Fetch and write lock the data page. */
	if ((ret = CDB___db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
		goto done;
	if ((ret = CDB_memp_fget(dbp->mpf, &pg, DB_MPOOL_CREATE, &page)) != 0) {
		/* We did not fetch it, we can release the lock. */
		(void)__LPUT(dbc, lock);
		goto done;
	}

	/* See if this is a new page. */
	if (page->pgno == 0) {
		page->pgno = pg;
		page->type = P_QAMDATA;
	}

	/* Put the item on the page and log it. */
	if ((ret = CDB___qam_pitem(dbc, page,
	    QAM_RECNO_INDEX(dbp, pg, start, recno), recno, data)) != 0) {
		/* The put failed.  We did not change the row. */
		(void)__LPUT(dbc, cp->lock);
		(void)CDB_memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
 		return (ret);
	}

	/* Doing record locking, release the page lock */
	if ((ret = __LPUT(dbc, lock)) != 0)
		return (ret);
	if ((ret = CDB_memp_fput(dbp->mpf, page, DB_MPOOL_DIRTY)) != 0)
		return (ret);

	/* Return the record number to the user. */
	*(db_recno_t *)key->data = recno;

done:
	/* Discard the cursor. */
	if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
		ret = t_ret;

	return (ret);
}

/*
 * CDB___qam_i_delete --
 *	Internal version of recno delete, called by CDB___qam_delete and
 *	CDB___qam_c_del.
 */
static int
CDB___qam_i_delete(dbc)
	DBC *dbc;
{
	QUEUE_CURSOR *cp;
	DB *dbp;
	DBT data;
	DB_LOCK lock;
	PAGE *pagep;
	QAMDATA *qp;
	db_recno_t start;
	int exact, ret, t_ret;

	dbp = dbc->dbp;
	cp = dbc->internal;
	start = cp->start;
	ret = 0;

	/*
	 * If this is CDB and this isn't a write cursor, then it's an error.
	 * If it is a write cursor, but we don't yet hold the write lock, then
	 * we need to upgrade to the write lock.
	 */
	if (F_ISSET(dbp->dbenv, DB_ENV_CDB)) {
		/* Make sure it's a valid update cursor. */
		if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER))
			return (EINVAL);

		if (F_ISSET(dbc, DBC_WRITECURSOR) &&
		    (ret = CDB_lock_get(dbp->dbenv, dbc->locker,
		    DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
		    &dbc->mylock)) != 0)
			return (EAGAIN);
	}

	if ((ret = CDB___db_lget(dbc,
	    0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
		goto err;

	cp->lock_mode = DB_LOCK_WRITE;
	/* Find the record ; delete only deletes exact matches. */
	if ((ret = CDB___qam_position(dbc,
	    &cp->recno, DB_LOCK_WRITE, start, &exact)) != 0) {
		cp->lock = lock;
		goto err;
	}
	if (!exact) {
		ret = DB_NOTFOUND;
		goto err1;
	}

	pagep = cp->page;
	qp = QAM_GET_RECORD(dbp, pagep, cp->indx);

	if (DB_LOGGING(dbc)) {
		data.size = ((QUEUE *)dbp->q_internal)->re_len;
		data.data = qp->data;
		if ((ret = CDB___qam_del_log(dbp->dbenv, dbc->txn,
		    &LSN(pagep), 0, dbp->log_fileid, &LSN(pagep),
		    pagep->pgno, cp->indx, cp->recno)) != 0)
			goto err1;
	}

	F_CLR(qp, QAM_VALID);

err1:
	if ((t_ret = CDB_memp_fput(
	    dbp->mpf, cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0)
		return (ret ? ret : t_ret);
	/* Doing record locking, release the page lock */
	if ((t_ret = __LPUT(dbc, cp->lock)) != 0) {
		cp->lock = lock;
		return (ret ? ret : t_ret);
	}
	cp->lock = lock;
	return (ret);

err:	if (F_ISSET(dbp->dbenv, DB_ENV_CDB) && F_ISSET(dbc, DBC_WRITECURSOR))
		(void)CDB___lock_downgrade(
		    dbp->dbenv, &dbc->mylock, DB_LOCK_IWRITE, 0);
	return (ret);
}

/*
 * CDB___qam_delete --
 *	Queue db->del function.
 *
 * PUBLIC: int CDB___qam_delete __P((DB *, DB_TXN *, DBT *, u_int32_t));
 */
int
CDB___qam_delete(dbp, txn, key, flags)
	DB *dbp;
	DB_TXN *txn;
	DBT *key;
	u_int32_t flags;
{
	QUEUE_CURSOR *cp;
	DBC *dbc;
	db_recno_t total;
	int ret, t_ret;

	PANIC_CHECK(dbp->dbenv);

	/* Check for invalid flags. */
	if ((ret =
	    CDB___db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0)
		return (ret);

	/* Acquire a cursor. */
	if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
		return (ret);

	DEBUG_LWRITE(dbc, txn, "qam_delete", key, NULL, flags);

	cp = dbc->internal;
	if ((ret = CDB___qam_getno(dbp, key, &cp->recno)) != 0)
		goto err;
	CDB___qam_nrecs(dbc, &total, &cp->start);

	/* Do the delete. */
	if (cp->recno > total) {
		ret = DB_NOTFOUND;
		goto err;
	}

	ret = CDB___qam_i_delete(dbc);

	/* Release the cursor. */
err:	if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
		ret = t_ret;

	return (ret);
}

/*
 * CDB___qam_c_del --
 *	Qam cursor->c_del function
 */
static int
CDB___qam_c_del(dbc, flags)
	DBC *dbc;
	u_int32_t flags;
{
	QUEUE_CURSOR *cp;
	DB *dbp;
	db_recno_t total;
	int ret;

	dbp = dbc->dbp;
	cp = dbc->internal;

	PANIC_CHECK(dbp->dbenv);

	/* Check for invalid flags. */
	if ((ret = CDB___db_cdelchk(dbp, flags,
	    F_ISSET(dbp, DB_AM_RDONLY), cp->recno != RECNO_OOB)) != 0)
		return (ret);

	DEBUG_LWRITE(dbc, dbc->txn, "qam_c_del", NULL, NULL, flags);

	if ((ret = CDB___qam_nrecs(dbc, &total, &cp->start)) != 0)
		return (ret);
	return (CDB___qam_i_delete(dbc));
}


/*
 * CDB___qam_c_get --
 *	Queue cursor->c_get function.
 */
static int
CDB___qam_c_get(dbc_orig, key, data, flags)
	DBC *dbc_orig;
	DBT *key, *data;
	u_int32_t flags;
{
	QUEUE_CURSOR *cp, *orig;
	DB *dbp;
	DB_LOCK lock, pglock, metalock, save_lock;
	DBC *dbc;
	PAGE *pg;
	QAMDATA *qp;
	QMETA *meta;
	db_indx_t save_indx;
	db_lockmode_t lock_mode;
	db_pgno_t metapno, save_page;
	db_recno_t start, first, skipped, save_recno;
	int exact, is_first, locked, ret, t_ret, with_delete;
	int put_mode, meta_dirty;

	orig = dbc_orig->internal;
	dbp = dbc_orig->dbp;

	PANIC_CHECK(dbp->dbenv);

	with_delete = 0;
	lock_mode = DB_LOCK_READ;
	put_mode = 0;
	t_ret = 0;

	/* Check for invalid flags. */
	if ((ret = CDB___db_cgetchk(dbp,
	    key, data, flags, orig->recno != RECNO_OOB)) != 0)
		return (ret);

	/* Clear OR'd in additional bits so we can check for flag equality. */
	if (LF_ISSET(DB_RMW)) {
		lock_mode = DB_LOCK_WRITE;
		LF_CLR(DB_RMW);
	}

	if (flags == DB_CONSUME) {
		with_delete = 1;
		flags = DB_FIRST;
		lock_mode = DB_LOCK_WRITE;
	}

	DEBUG_LREAD(dbc_orig, dbc_orig->txn, "qam_c_get",
	    flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);

	/* Get a copy of the original cursor, including position. */
	if ((ret = dbc_orig->c_dup(dbc_orig, &dbc, DB_POSITIONI)) != 0)
		return (ret);
	cp = dbc->internal;

	is_first = 0;

	/* get the meta page */
	metapno = ((QUEUE *)dbp->q_internal)->q_meta;
	if ((ret = CDB___db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
		return (ret);
	locked = 1;
	if ((ret = CDB_memp_fget(dbp->mpf, &metapno, 0, &meta)) != 0) {
		/* We did not fetch it, we can release the lock. */
		(void)__LPUT(dbc, metalock);
		return (ret);
	}

	skipped = 0;

	/* Make lint and friends happy. */
	first = 0;
	meta_dirty = 0;

	/* Release any previous lock if not in a transaction. */
	if (cp->lock.off != LOCK_INVALID) {
		(void)__TLPUT(dbc, cp->lock);
		cp->lock.off = LOCK_INVALID;
	}

retry:	/* Update the record number. */
	cp->start = start = meta->start;
	switch (flags) {
	case DB_CURRENT:
		break;
	case DB_NEXT:
		if (cp->recno != RECNO_OOB) {
			++cp->recno;
			break;
		}
		/* FALLTHROUGH */
	case DB_FIRST:
		flags = DB_NEXT;
		is_first = 1;

		/* get the first record number */
		cp->recno = first = meta->first_recno;

		/* if we will delete it, then increment */
		if (with_delete && first < meta->cur_recno) {
			if (DB_LOGGING(dbc))
				CDB___qam_incfirst_log(dbp->dbenv, dbc->txn,
					&LSN(meta), 0,
					dbp->log_fileid, first);
			meta->first_recno++;
			meta_dirty = 1;
		}

		break;
	case DB_PREV:
		if (cp->recno != RECNO_OOB) {
			if (cp->recno <= meta->first_recno) {
				ret = DB_NOTFOUND;
				goto err;
			}
			--cp->recno;
			break;
		}
		/* FALLTHROUGH */
	case DB_LAST:
		cp->recno = meta->cur_recno;
		if (cp->recno == 0) {
			ret = DB_NOTFOUND;
			goto err;
		}
		break;
	case DB_SET:
	case DB_SET_RANGE:
		if ((ret = CDB___qam_getno(dbp, key, &cp->recno)) != 0)
			goto err;
		break;
	}

	if (cp->recno > meta->cur_recno || cp->recno < start) {
		ret = DB_NOTFOUND;
		pg = NULL;
		if (skipped)
			goto undo_meta;
		goto err;
	}

	/* Don't hold the meta page long term. */
	if (locked) {
		if ((ret = __LPUT(dbc, metalock)) != 0)
			goto err;
		locked = 0;
	}

	/* Lock the record. */
	if ((ret = CDB___db_lget(dbc, 0, cp->recno, lock_mode,
	    with_delete ? DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
	    &lock)) == DB_LOCK_NOTGRANTED) {
		/*
		 * In the DB_CONSUME case we skip the locked
		 * record, someone else will pick it up.
		 *
		 */
		is_first = 0;
		if (skipped == 0)
			skipped = cp->recno;
		goto retry;
	}

	if (ret != 0)
		goto err;

	/*
	 * In the DB_FIRST or DB_LAST cases we must wait and then start over
	 * since the first/last may have moved while we slept.
	 * We release our locks and try again.
	 */
	if ((!with_delete && is_first) || flags == DB_LAST) {
		if ((ret =
		    CDB___db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
			goto err;
		if (cp->recno !=
		    (is_first ? meta->first_recno : meta->cur_recno)) {
			__LPUT(dbc, lock);
			if (is_first)
				flags = DB_FIRST;
			locked = 1;
			goto retry;
		}
		/* Don't hold the meta page long term. */
		if ((ret = __LPUT(dbc, metalock)) != 0)
			goto err;
	}

	/* Position the cursor on the record. */
	if ((ret =
	    CDB___qam_position(dbc, &cp->recno, lock_mode, start, &exact)) != 0) {
		/* We cannot get the page, release the record lock. */
		(void)__LPUT(dbc, lock);
		goto err;
	}

	pg = cp->page;
	pglock = cp->lock;
	cp->lock = lock;
	cp->lock_mode = lock_mode;

	if (!exact) {
		if (flags == DB_NEXT || flags == DB_PREV || flags == DB_LAST) {
			/* Release locks and try again. */
			(void)CDB_memp_fput(dbp->mpf, cp->page, 0);
			(void)__LPUT(dbc, pglock);
			(void)__LPUT(dbc, cp->lock);
			cp->lock.off = LOCK_INVALID;
			if (flags == DB_LAST)
				flags = DB_PREV;
			if (!with_delete)
				is_first = 0;
			goto retry;
		}
		/* this is for the SET and SET_RANGE cases */
		ret = DB_KEYEMPTY;
		goto err1;
	}

	/* Return the key if the user didn't give us one. */
	if (flags != DB_SET &&
	    (ret = CDB___db_retcopy(dbp, key, &cp->recno, sizeof(cp->recno),
	    &dbc->rkey.data, &dbc->rkey.ulen)) != 0) {
		if (with_delete)
			goto undo_meta;
		else
			goto err1;
	}

	qp = QAM_GET_RECORD(dbp, pg, cp->indx);

	/* Return the data item. */
	if ((ret = CDB___db_retcopy(dbp, data, qp->data,
	    ((QUEUE *)dbp->q_internal)->re_len,
	    &dbc->rdata.data, &dbc->rdata.ulen)) != 0) {
		if (with_delete)
			goto undo_meta;
		else
			goto err1;
	}

	/* Finally, if we are doing DB_CONSUME mark the record. */
	if (with_delete) {
		if (DB_LOGGING(dbc))
			if ((ret = CDB___qam_del_log(dbp->dbenv, dbc->txn,
			    &LSN(pg), 0, dbp->log_fileid, &LSN(pg),
			    pg->pgno, cp->indx, cp->recno)) != 0)
				goto undo_meta;

		F_CLR(qp, QAM_VALID);
		put_mode = DB_MPOOL_DIRTY;

		/*
		 * This code is responsible for correcting metadata.
		 * There are 3 cases.
		 *	1) We moved ahead more than one record.
		 *	2) We did not actually delete cp->recno.
		 *	3) We encountered at least one locked.
		 *		record and skipped them.
		 */
		if (cp->recno != first) {
			if (0) {
undo_meta:			is_first = 0;
			}
			if (locked == 0 && (t_ret = CDB___db_lget(
			    dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
				goto err1;

			if (is_first) {
			/*
			 * Check to see if we moved past the first record,
			 * if so update meta so others can start past the
			 * deleted records.
			 */
				if (meta->first_recno > first) {
					meta->first_recno = cp->recno;
					meta_dirty = 1;
				}
			}
			else if (skipped == 0) {
				/*
				 * Error case: we did not actually delete the
				 * record, restore meta_first so that it is at
				 * least at or before cp->recno
				 */
				if (meta->first_recno > cp->recno) {
					meta->first_recno = cp->recno;
					meta_dirty = 1;
				}
			}
			else if (meta->first_recno > skipped) {
				/*
				 * We skipped some records because they were
				 * locked. If the meta-data page reflects a
				 * starting pointer after the skipped records
				 * we need to move it back to the first record
				 * that is not deleted or is sill locked.
				 * Release locks as we go, we are only
				 * reading to optimize future fetches.
				 */
				first = meta->first_recno;
				/* Don't hold the meta page long term. */
				__LPUT(dbc, metalock);
				locked = 0;

				/* reverify the skipped record */
				save_page = cp->pgno;
				save_indx = cp->indx;
				save_recno = cp->recno;
				save_lock = cp->lock;
				do {
					t_ret = CDB___db_lget(dbc, 0, skipped,
					    DB_LOCK_READ,
					    DB_LOCK_NOWAIT | DB_LOCK_RECORD,
					    &lock);
					if (t_ret == DB_LOCK_NOTGRANTED)
						break;
					if (t_ret != 0)
						goto err1;
					if ((t_ret =
					    CDB___qam_position(dbc, &skipped,
					    DB_LOCK_READ,
					    start, &exact)) != 0) {
						(void)__LPUT(dbc, lock);
						goto err1;
					}
					if ((t_ret = CDB_memp_fput(dbp->mpf,
					     cp->page, put_mode)) != 0)
						goto err1;
					if ((t_ret =__LPUT(dbc, lock)) != 0)
						goto err1;
					if ((t_ret =
					    __LPUT(dbc, cp->lock)) != 0)
						goto err1;
					if (exact)
						break;
				} while (++skipped <= first);

				t_ret = 0;
				if ((t_ret = CDB___db_lget(
				    dbc, 0, metapno,
				    lock_mode, 0, &metalock)) != 0)
					goto err1;

				if (meta->first_recno > skipped) {
					meta->first_recno = skipped;
					meta_dirty = 1;
				}
				cp->pgno = save_page;
				cp->indx = save_indx;
				cp->recno = save_recno;
				cp->lock = save_lock;
			}
			locked = 1;
		}
	}

	/*
	 * Swap the cursors so we are left with the new position inside of
	 * the original DBCs structure, and close the dup'd cursor once it
	 * references the old position.
	 *
	 * The close can fail, but we only expect DB_LOCK_DEADLOCK failures.
	 * This violates our "the cursor is unchanged on error" semantics,
	 * but since all you can do with a DB_LOCK_DEADLOCK failure is close
	 * the cursor, I believe that's OK.
	 */
	orig = dbc_orig->internal;
	dbc_orig->internal = dbc->internal;
	dbc->internal = orig;


err1:
	if (pg != NULL) {
		if (!ret)
			ret = t_ret;
		t_ret = CDB_memp_fput(dbp->mpf, pg, put_mode);

		if (!ret)
			ret = t_ret;
		/* Doing record locking, release the page lock */
		t_ret = __LPUT(dbc, pglock);
	}

err:
	if (meta) {
		if (!ret)
			ret = t_ret;

		/* release the meta page */
		t_ret = CDB_memp_fput(
		    dbp->mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0);

		if (!ret)
			ret = t_ret;

		/* Don't hold the meta page long term. */
		if (locked)
			t_ret = __LPUT(dbc, metalock);
	}

	if (!ret)
		ret = t_ret;

	t_ret = dbc->c_close(dbc);

	return (ret ? ret : t_ret);
}

/*
 * CDB___qam_c_close --
 *	Close down the cursor from a single use.
 */
static int
CDB___qam_c_close(dbc)
	DBC *dbc;
{
	QUEUE_CURSOR *cp;

	cp = dbc->internal;

	/* Discard any locks not acquired inside of a transaction. */
	if (cp->lock.off != LOCK_INVALID) {
		(void)__TLPUT(dbc, cp->lock);
		cp->lock.off = LOCK_INVALID;
	}

	cp->page = NULL;
	cp->pgno = PGNO_INVALID;
	cp->indx = 0;
	cp->lock.off = LOCK_INVALID;
	cp->lock_mode = DB_LOCK_NG;
	cp->recno = RECNO_OOB;
	cp->flags = 0;

	return (0);
}

/*
 * CDB___qam_c_dup --
 *	Duplicate a queue cursor, such that the new one holds appropriate
 *	locks for the position of the original.
 *
 * PUBLIC: int CDB___qam_c_dup __P((DBC *, DBC *));
 */
int
CDB___qam_c_dup(orig_dbc, new_dbc)
	DBC *orig_dbc, *new_dbc;
{
	QUEUE_CURSOR *orig, *new;

	orig = orig_dbc->internal;
	new = new_dbc->internal;

	new->pgno = orig->pgno;
	new->indx = orig->indx;

	new->recno = orig->recno;
	new->start = orig->start;

	new->lock_mode = orig->lock_mode;

	/* reget the long term lock if we are not in a xact */
	if (orig_dbc->txn != NULL || orig->lock.off == LOCK_INVALID)
		return (0);

	return (CDB___db_lget(new_dbc,
	    0, new->recno, new->lock_mode, DB_LOCK_RECORD, &new->lock));
}

/*
 * CDB___qam_c_init
 *
 * PUBLIC: int CDB___qam_c_init __P((DBC *));
 */
int
CDB___qam_c_init(dbc)
	DBC *dbc;
{
	QUEUE_CURSOR *cp;
	DB *dbp;
	int ret;

	dbp = dbc->dbp;

	/* Allocate the internal structure. */
	if ((ret = CDB___os_calloc(1, sizeof(QUEUE_CURSOR), &cp)) != 0)
		return (ret);

	/*
	 * Logical record numbers are always the same size, and we don't want
	 * to have to check for space every time we return one.  Allocate it
	 * in advance.
	 */
	if ((ret =
	    CDB___os_malloc(sizeof(db_recno_t), NULL, &dbc->rkey.data)) != 0) {
		CDB___os_free(cp, sizeof(QUEUE_CURSOR));
		return (ret);
	}
	dbc->rkey.ulen = sizeof(db_recno_t);

	/* Initialize methods. */
	dbc->internal = cp;
	dbc->c_del = CDB___qam_c_del;
	dbc->c_get = CDB___qam_c_get;
	dbc->c_put = CDB___qam_c_put;
	dbc->c_am_close = CDB___qam_c_close;
	dbc->c_am_destroy = CDB___qam_c_destroy;

	return (0);
}
/*
 * CDB___qam_c_destroy --
 *	Close a single cursor -- internal version.
 */
static int
CDB___qam_c_destroy(dbc)
	DBC *dbc;
{
	/* Discard the structures. */
	CDB___os_free(dbc->internal, sizeof(QUEUE_CURSOR));

	return (0);
}

/*
 * CDB___qam_getno --
 *	Check the user's record number.
 */
static int
CDB___qam_getno(dbp, key, rep)
	DB *dbp;
	const DBT *key;
	db_recno_t *rep;
{
	if ((*rep = *(db_recno_t *)key->data) == 0) {
		CDB___db_err(dbp->dbenv, "illegal record number of 0");
		return (EINVAL);
	}
	return (0);
}