Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F120834289
cyrusdb_twoskip.c
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
72 KB
Referenced Files
None
Subscribers
None
cyrusdb_twoskip.c
View Options
/* cyrusdb_twoskip.c - brand new twoskip implementation, not backwards anything
*
* Copyright (c) 1994-2008 Carnegie Mellon University. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The name "Carnegie Mellon University" must not be used to
* endorse or promote products derived from this software without
* prior written permission. For permission or any legal
* details, please contact
* Carnegie Mellon University
* Center for Technology Transfer and Enterprise Creation
* 4615 Forbes Avenue
* Suite 302
* Pittsburgh, PA 15213
* (412) 268-7393, fax: (412) 268-7395
* innovation@andrew.cmu.edu
*
* 4. Redistributions of any form whatsoever must retain the following
* acknowledgment:
* "This product includes software developed by Computing Services
* at Carnegie Mellon University (http://www.cmu.edu/computing/)."
*
* CARNEGIE MELLON UNIVERSITY DISCLAIMS ALL WARRANTIES WITH REGARD TO
* THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS, IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY BE LIABLE
* FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
* AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
* OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include
<config.h>
#include
<errno.h>
#include
<limits.h>
#include
<stdlib.h>
#include
<string.h>
#include
<syslog.h>
#include
<sys/types.h>
#ifdef HAVE_UNISTD_H
#include
<unistd.h>
#endif
#include
"assert.h"
#include
"bsearch.h"
#include
"byteorder.h"
#include
"cyrusdb.h"
#include
"crc32.h"
#include
"libcyr_cfg.h"
#include
"mappedfile.h"
#include
"util.h"
#include
"xmalloc.h"
/*
* twoskip disk format.
*
* GOALS:
* a) 64 bit through
* b) Fast recovery after crashes
* c) integrity checks throughout
* d) simple format
*
* ACHIEVED BY:
* a)
* - 64 bit offsets for all values
* - smaller initial keylen and vallen, but they can
* can be extended up to 64 bits as well.
* - no timestamps stored in the file.
* - XXX - may behave strangely with large files on
* 32 bit architectures, particularly if size_t is
* not 64 bit.
*
* b)
* - "dirty flag" is always set in the header and
* fsynced BEFORE writing anything else.
* - a header field for "current size", after which
* all changes are considered suspect until commit.
* - two "lowest level" offsets, used in alternating
* order, so the highest value less than "current_size"
* is always the correct pointer - this means we
* never lose linkage, so never need to rewrite more
* than the affected records during a recovery.
* - all data is fsynced BEFORE rewriting the header to
* remove the dirty flag.
* - As long as the first 64 bytes of the file are
* guaranteed to write all together or not at all,
* we're crash-safe.
*
* c)
* - every byte in the file is covered by one of the
* crc32 values stored throughout.
* - header CRC is checked on every header read (open/lock)
* - record head CRCs are checked on every record read,
* including skiplist traverse.
* - record tail CRCs (key/value) are check on every exact
* key match result, during traverse for read or write.
*
* d)
* - there are no special commit, inorder, etc records.
* just add records and ghost "delete" records to give
* somewhere to point to on deletes. These are only
* at the lowest level, so don't have a significant
* seek impact.
* - modular code makes the logic much clearer.
*/
/*
* FORMAT:
*
* HEADER: 64 bytes
* magic: 20 bytes: "4 bytes same as skiplist" "twoskip file\0\0\0\0"
* version: 4 bytes
* generation: 8 bytes
* num_records: 8 bytes
* repack_size: 8 bytes
* current_size: 8 bytes
* flags: 4 bytes
* crc32: 4 bytes
*
* RECORDS:
* type 1 byte
* level: 1 byte
* keylen: 2 bytes
* vallen: 4 bytes
* <optionally: 64 bit keylen if keylen == UINT16_MAX>
* <optionally: 64 bit vallen if vallen == UINT32_MAX>
* ptrs: 8 bytes * (level+1)
* crc32_head: 4 bytes
* crc32_tail: 4 bytes
* key: (keylen bytes)
* val: (vallen bytes)
* padding: enough zeros to round up to an 8 byte multiple
*
* defined types, in skiplist language are:
* '=' -> DUMMY
* '+' -> ADD/INORDER
* '-' -> DELETE (kind of)
* '$' -> COMMIT
* but note that delete records behave differently - they're
* part of the pointer hierarchy, so that back offsets will
* always point somewhere past the 'end' until commit.
*
* The DUMMY is always MAXLEVEL level, with zero keylen and vallen
* The DELETE is always zero level, with zero keylen and vallen
* crc32_head is calculated on all bytes before it in the record
* crc32_tail is calculated on all bytes after, INCLUDING padding
*
* The COMMIT is inserted at the end of each transaction, and its
* single pointer points back to the start of the transaction.
*/
/* OPERATION:
*
* Finding a record works very much like skiplist, but we have
* a datastructure, 'struct skiploc', to help find it. There
* is one of these embedded directly in the 'struct db', and
* it's the only one we ever use.
*
* skiploc contains two complete sets of offsets - at every
* level the offset of the previous record, and the offset of
* the next record, in relation to the requested key. If the
* key is an exact match, it also contains a copy of the
* struct skiprecord. If not, it contains the struct
* skiprecord for the previous record at level zero.
*
* It also contains a 'struct buf' with a copy of the requested
* key, which allows for efficient relocation of the position in
* the file when nothing is changed.
*
* So nothing is really changed with finding, except the special
* "level zero" alternative pointer. We'll see that in action
* later.
*
* TRANSACTIONS:
* 1) before writing anything else, the header is updated with the
* DIRTY flag set, and then fdatasync is run.
* 2) after all changes, fdatasync is run again.
* 3) finally, the header is updated with a new current_size and
* the DIRTY flag clear, then fdatasync is run for a third time.
*
* ADDING A NEW RECORD:
* a new record is created with forward locations pointing to the
* next pointers in the skiploc. This is appended to the file.
* This works for either a create OR replace, since in both cases
* the nextlocs are correct. Level zero is set to zero on a new
* record.
*
* If it's not a replace, a "random" level will be chosen for the
* record. All update operations below apply only up to this level,
* pointers above are unaffected - and continue over the location
* of this change.
*
* For each backloc, the record at that offset is read, and the
* forward pointers at each level are replaced with the offset
* of the new record. NOTE: see special level zero logic below.
*
* Again, if this was a replace, the backlocs don't point to the
* current record, so it just silently disappears from the lists.
*
* DELETING A RECORD:
* The logic is almost identical to adding, but a delete record
* always has a level of zero, with only a single pointer forward
* to the next record.
*
* Because of this, the updates are done up to the level of the
* old record instead.
*
* THE SPECIAL "level zero":
* To allow a "fast fsck" after an aborted transaction, rather
* than having only a single level 1 pointer, we have two. The
* following algorithm is used to select which one to change.
*
* The definition of "current_size" is the size of the database
* at last commit, it does not get updated during a transaction.
*
* So: when updating level 1 - if either level 1 or level 0 has
* a value >= current_size, then that value gets updated again.
* otherwise, the lowest value gets replaced with the new value.
*
* when reading, the highest value is used - except during
* recovery when it's the highest value less than current_size,
* since any "future" offsets are bogus.
*
* This means that there is always at least one offset which
* points to the "next" record as if the current transaction
* had never occured - allowing recovery to find all alive
* records without scanning and updating the rest of the file.
* This guarantee exists regardless of any ordering of writes
* within the transaction, any page could be inconsistent and
* the result is still a clean recovery.
*
* CHECKPOINT:
* Over time, a twoskip database accumulates cruft - replaced
* records and delete records. Records out of order, slowing
* down sequential access. When the size at last repack
* is sufficiently smaller than the current size (see the
* TUNING constants below) then the file is checkpointed.
* A checkpoint is achieved by creating a new file, and
* copying all the current records, in order, into it, then
* renaming the new file over the old. The "generation"
* counter in the header is incremented to tell other users
* that offsets into the file are no longer valid. This is
* more reliable than just using the inode, because inodes
* can be reused.
*
* LOCATION OPTIMISATION:
* If the generation is unchanged AND the size of the file
* is unchanged, then all offsets stored in the skiploc are
* still valid. This is used to optimise finding the current
* key, advancing to the "next" key, and also to optimise
* regular fetches that happen to hit either the current key,
* the gap immediately after, or the next key. All other
* locations cause a full relocate.
*/
/********** TUNING *************/
/* don't bother rewriting if the database has less than this much data */
#define MINREWRITE 16834
/* number of skiplist levels - 31 gives us binary search to 2^32 records.
* limited to 255 by file format, but skiplist had 20, and that was enough
* for most real uses. 31 is heaps. */
#define MAXLEVEL 31
/* should be 0.5 for binary search semantics */
#define PROB 0.5
/* release lock in foreach at least every N records */
#define FOREACH_LOCK_RELEASE 256
/* format specifics */
#undef VERSION
/* defined in config.h */
#define VERSION 1
/* type aliases */
#define LLU long long unsigned int
#define LU long unsigned int
/* record types */
#define DUMMY '='
#define RECORD '+'
#define DELETE '-'
#define COMMIT '$'
/* magic 8-byte pad - space for type, easy to read on hexdump
* and with both low and high bits to be unlikely in real data */
#define BLANK " BLANK\x07\xa0"
/********** DATA STRUCTURES *************/
/* A single "record" in the twoskip file. This could be a
* DUMMY, a KEYRECORD, a VALRECORD or even a DELETE - they
* all read and write with the same functions */
struct
skiprecord
{
/* location on disk (not part of the on-disk format as such) */
size_t
offset
;
size_t
len
;
/* what are our header fields */
uint8_t
type
;
uint8_t
level
;
size_t
keylen
;
size_t
vallen
;
/* where to do we go from here? */
size_t
nextloc
[
MAXLEVEL
+
1
];
/* what do our integrity checks say? */
uint32_t
crc32_head
;
uint32_t
crc32_tail
;
/* our key and value */
size_t
keyoffset
;
size_t
valoffset
;
};
/* a location in the twoskip file. We always have:
* record: if "is_exactmatch" this points to the record
* with the matching key, otherwise it points to
* the 'compar' order previous record.
* backloc: the records that point TO this location
* at each level. If is_exactmatch, they
* point to the record, otherwise they are
* the record.
* forwardloc: the records pointed to by the record
* at 'backloc' at the same level. Kept
* here for efficiency
* keybuf: a copy of the requested key - we always keep
* this so we can re-seek after the file has been
* checkpointed under us (say a read-only foreach)
*
* generation and end can be used to see if anything in
* the file may have changed and needs re-reading.
*/
struct
skiploc
{
/* requested, may not match actual record */
struct
buf
keybuf
;
int
is_exactmatch
;
/* current or next record */
struct
skiprecord
record
;
/* we need both sets of offsets to cheaply insert */
size_t
backloc
[
MAXLEVEL
+
1
];
size_t
forwardloc
[
MAXLEVEL
+
1
];
/* need a generation so we know if the location is still valid */
uint64_t
generation
;
size_t
end
;
};
#define DIRTY (1<<0)
struct
txn
{
/* logstart is where we start changes from on commit, where we truncate
to on abort */
int
num
;
int
shared
;
};
struct
db_header
{
/* header info */
uint32_t
version
;
uint32_t
flags
;
uint64_t
generation
;
uint64_t
num_records
;
size_t
repack_size
;
size_t
current_size
;
};
struct
dbengine
{
/* file data */
struct
mappedfile
*
mf
;
struct
db_header
header
;
struct
skiploc
loc
;
/* tracking info */
int
is_open
;
size_t
end
;
int
txn_num
;
struct
txn
*
current_txn
;
/* comparator function to use for sorting */
int
open_flags
;
int
(
*
compar
)
(
const
char
*
s1
,
int
l1
,
const
char
*
s2
,
int
l2
);
};
struct
db_list
{
struct
dbengine
*
db
;
struct
db_list
*
next
;
int
refcount
;
};
#define HEADER_MAGIC ("\241\002\213\015twoskip file\0\0\0\0")
#define HEADER_MAGIC_SIZE (20)
/* offsets of header files */
enum
{
OFFSET_HEADER
=
0
,
OFFSET_VERSION
=
20
,
OFFSET_GENERATION
=
24
,
OFFSET_NUM_RECORDS
=
32
,
OFFSET_REPACK_SIZE
=
40
,
OFFSET_CURRENT_SIZE
=
48
,
OFFSET_FLAGS
=
56
,
OFFSET_CRC32
=
60
,
};
#define HEADER_SIZE 64
#define DUMMY_OFFSET HEADER_SIZE
#define MAXRECORDHEAD ((MAXLEVEL + 5)*8)
// NOTE: MAXLEVEL should be chosen so that MAXRECORDHEAD always
// fits within BLOCKSIZE so header rewrites are atomic.
#define BLOCKSIZE 512
/* mount a scratch monkey */
static
union
skipwritebuf
{
uint64_t
align
;
char
s
[
MAXRECORDHEAD
];
}
scratchspace
;
static
struct
db_list
*
open_twoskip
=
NULL
;
static
int
mycommit
(
struct
dbengine
*
db
,
struct
txn
*
tid
);
static
int
myabort
(
struct
dbengine
*
db
,
struct
txn
*
tid
);
static
int
mycheckpoint
(
struct
dbengine
*
db
);
static
int
myconsistent
(
struct
dbengine
*
db
,
struct
txn
*
tid
);
static
int
recovery
(
struct
dbengine
*
db
);
static
int
recovery1
(
struct
dbengine
*
db
,
int
*
count
);
static
int
recovery2
(
struct
dbengine
*
db
,
int
*
count
);
/************** HELPER FUNCTIONS ****************/
#define BASE(db) mappedfile_base((db)->mf)
#define KEY(db, rec) (BASE(db) + (rec)->keyoffset)
#define VAL(db, rec) (BASE(db) + (rec)->valoffset)
#define SIZE(db) mappedfile_size((db)->mf)
#define FNAME(db) mappedfile_fname((db)->mf)
/* calculate padding size */
static
size_t
roundup
(
size_t
record_size
,
int
howfar
)
{
if
(
record_size
%
howfar
)
record_size
+=
howfar
-
(
record_size
%
howfar
);
return
record_size
;
}
/* choose a level appropriately randomly */
static
uint8_t
randlvl
(
uint8_t
lvl
,
uint8_t
maxlvl
)
{
while
(((
float
)
rand
()
/
(
float
)
(
RAND_MAX
))
<
PROB
)
{
lvl
++
;
if
(
lvl
==
maxlvl
)
break
;
}
return
lvl
;
}
/************** HEADER ****************/
/* given an open, mapped db, read in the header information */
static
int
read_header
(
struct
dbengine
*
db
)
{
uint32_t
crc
;
assert
(
db
&&
db
->
mf
&&
db
->
is_open
);
if
(
SIZE
(
db
)
<
HEADER_SIZE
)
{
syslog
(
LOG_ERR
,
"twoskip: file not large enough for header: %s"
,
FNAME
(
db
));
return
CYRUSDB_IOERROR
;
}
if
(
memcmp
(
BASE
(
db
),
HEADER_MAGIC
,
HEADER_MAGIC_SIZE
))
{
syslog
(
LOG_ERR
,
"twoskip: invalid magic header: %s"
,
FNAME
(
db
));
return
CYRUSDB_IOERROR
;
}
db
->
header
.
version
=
ntohl
(
*
((
uint32_t
*
)(
BASE
(
db
)
+
OFFSET_VERSION
)));
if
(
db
->
header
.
version
>
VERSION
)
{
syslog
(
LOG_ERR
,
"twoskip: version mismatch: %s has version %d"
,
FNAME
(
db
),
db
->
header
.
version
);
return
CYRUSDB_IOERROR
;
}
db
->
header
.
generation
=
ntohll
(
*
((
uint64_t
*
)(
BASE
(
db
)
+
OFFSET_GENERATION
)));
db
->
header
.
num_records
=
ntohll
(
*
((
uint64_t
*
)(
BASE
(
db
)
+
OFFSET_NUM_RECORDS
)));
db
->
header
.
repack_size
=
ntohll
(
*
((
uint64_t
*
)(
BASE
(
db
)
+
OFFSET_REPACK_SIZE
)));
db
->
header
.
current_size
=
ntohll
(
*
((
uint64_t
*
)(
BASE
(
db
)
+
OFFSET_CURRENT_SIZE
)));
db
->
header
.
flags
=
ntohl
(
*
((
uint32_t
*
)(
BASE
(
db
)
+
OFFSET_FLAGS
)));
crc
=
ntohl
(
*
((
uint32_t
*
)(
BASE
(
db
)
+
OFFSET_CRC32
)));
db
->
end
=
db
->
header
.
current_size
;
if
((
db
->
open_flags
&
CYRUSDB_NOCRC
))
return
0
;
if
(
crc32_map
(
BASE
(
db
),
OFFSET_CRC32
)
!=
crc
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: twoskip header CRC failure"
,
"filename=<%s>"
,
FNAME
(
db
));
return
CYRUSDB_IOERROR
;
}
return
0
;
}
/* given an open, mapped, locked db, write the header information */
static
int
write_header
(
struct
dbengine
*
db
)
{
char
*
buf
=
scratchspace
.
s
;
/* format one buffer */
memcpy
(
buf
,
HEADER_MAGIC
,
HEADER_MAGIC_SIZE
);
*
((
uint32_t
*
)(
buf
+
OFFSET_VERSION
))
=
htonl
(
db
->
header
.
version
);
*
((
uint64_t
*
)(
buf
+
OFFSET_GENERATION
))
=
htonll
(
db
->
header
.
generation
);
*
((
uint64_t
*
)(
buf
+
OFFSET_NUM_RECORDS
))
=
htonll
(
db
->
header
.
num_records
);
*
((
uint64_t
*
)(
buf
+
OFFSET_REPACK_SIZE
))
=
htonll
(
db
->
header
.
repack_size
);
*
((
uint64_t
*
)(
buf
+
OFFSET_CURRENT_SIZE
))
=
htonll
(
db
->
header
.
current_size
);
*
((
uint32_t
*
)(
buf
+
OFFSET_FLAGS
))
=
htonl
(
db
->
header
.
flags
);
*
((
uint32_t
*
)(
buf
+
OFFSET_CRC32
))
=
htonl
(
crc32_map
(
buf
,
OFFSET_CRC32
));
/* write it out */
if
(
mappedfile_pwrite
(
db
->
mf
,
buf
,
HEADER_SIZE
,
0
)
<
0
)
return
CYRUSDB_IOERROR
;
return
0
;
}
/* simple wrapper to write with an fsync */
static
int
commit_header
(
struct
dbengine
*
db
)
{
int
r
=
write_header
(
db
);
if
(
!
r
)
r
=
mappedfile_commit
(
db
->
mf
);
return
r
;
}
/******************** RECORD *********************/
static
int
check_tailcrc
(
struct
dbengine
*
db
,
struct
skiprecord
*
record
)
{
if
((
db
->
open_flags
&
CYRUSDB_NOCRC
))
return
0
;
uint32_t
crc
=
crc32_map
(
BASE
(
db
)
+
record
->
keyoffset
,
roundup
(
record
->
keylen
+
record
->
vallen
,
8
));
if
(
crc
!=
record
->
crc32_tail
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: invalid tail crc"
,
"filename=<%s> offset=<%llX>"
,
FNAME
(
db
),
(
LLU
)
record
->
offset
);
return
CYRUSDB_IOERROR
;
}
return
0
;
}
/* read a single skiprecord at the given offset */
#ifdef HAVE_DECLARE_OPTIMIZE
static
int
read_onerecord
(
struct
dbengine
*
db
,
size_t
offset
,
struct
skiprecord
*
record
)
__attribute__
((
optimize
(
"-O3"
)));
#endif
static
int
read_onerecord
(
struct
dbengine
*
db
,
size_t
offset
,
struct
skiprecord
*
record
)
{
const
char
*
base
;
int
i
;
memset
(
record
,
0
,
sizeof
(
struct
skiprecord
));
if
(
!
offset
)
return
0
;
record
->
offset
=
offset
;
record
->
len
=
24
;
/* absolute minimum */
/* need space for at least the header plus some details */
if
(
record
->
offset
+
record
->
len
>
SIZE
(
db
))
goto
badsize
;
base
=
BASE
(
db
)
+
offset
;
/* read in the record header */
record
->
type
=
base
[
0
];
record
->
level
=
base
[
1
];
record
->
keylen
=
ntohs
(
*
((
uint16_t
*
)(
base
+
2
)));
record
->
vallen
=
ntohl
(
*
((
uint32_t
*
)(
base
+
4
)));
offset
+=
8
;
/* make sure we fit */
if
(
record
->
level
>
MAXLEVEL
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: twoskip invalid level"
,
"filename=<%s> level=<%d> offset=<%08llX>"
,
FNAME
(
db
),
record
->
level
,
(
LLU
)
offset
);
return
CYRUSDB_IOERROR
;
}
/* long key */
if
(
record
->
keylen
==
UINT16_MAX
)
{
base
=
BASE
(
db
)
+
offset
;
record
->
keylen
=
ntohll
(
*
((
uint64_t
*
)
base
));
offset
+=
8
;
}
/* long value */
if
(
record
->
vallen
==
UINT32_MAX
)
{
base
=
BASE
(
db
)
+
offset
;
record
->
vallen
=
ntohll
(
*
((
uint64_t
*
)
base
));
offset
+=
8
;
}
/* we know the length now */
record
->
len
=
(
offset
-
record
->
offset
)
/* header including lengths */
+
8
*
(
1
+
record
->
level
)
/* ptrs */
+
8
/* crc32s */
+
roundup
(
record
->
keylen
+
record
->
vallen
,
8
);
/* keyval */
if
(
record
->
offset
+
record
->
len
>
SIZE
(
db
))
goto
badsize
;
for
(
i
=
0
;
i
<=
record
->
level
;
i
++
)
{
base
=
BASE
(
db
)
+
offset
;
record
->
nextloc
[
i
]
=
ntohll
(
*
((
uint64_t
*
)
base
));
offset
+=
8
;
}
base
=
BASE
(
db
)
+
offset
;
record
->
crc32_head
=
ntohl
(
*
((
uint32_t
*
)
base
));
record
->
crc32_tail
=
ntohl
(
*
((
uint32_t
*
)(
base
+
4
)));
record
->
keyoffset
=
offset
+
8
;
record
->
valoffset
=
record
->
keyoffset
+
record
->
keylen
;
if
((
db
->
open_flags
&
CYRUSDB_NOCRC
))
return
0
;
uint32_t
crc
=
crc32_map
(
BASE
(
db
)
+
record
->
offset
,
(
offset
-
record
->
offset
));
if
(
crc
!=
record
->
crc32_head
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: twoskip checksum head error"
,
"filename=<%s> offset=<%08llX>"
,
FNAME
(
db
),
(
LLU
)
offset
);
return
CYRUSDB_IOERROR
;
}
return
0
;
badsize
:
syslog
(
LOG_ERR
,
"twoskip: attempt to read past end of file %s: %08llX > %08llX"
,
FNAME
(
db
),
(
LLU
)
record
->
offset
+
record
->
len
,
(
LLU
)
SIZE
(
db
));
return
CYRUSDB_IOERROR
;
}
static
int
read_skipdelete
(
struct
dbengine
*
db
,
size_t
offset
,
struct
skiprecord
*
record
)
{
int
r
;
r
=
read_onerecord
(
db
,
offset
,
record
);
if
(
r
)
return
r
;
if
(
record
->
type
==
DELETE
)
r
=
read_onerecord
(
db
,
record
->
nextloc
[
0
],
record
);
return
r
;
}
/* prepare the header part of the record (everything except the key, value
* and padding). Used for both writes and rewrites. */
static
void
prepare_record
(
struct
skiprecord
*
record
,
char
*
buf
,
size_t
*
sizep
)
{
int
len
=
8
;
int
i
;
assert
(
record
->
level
<=
MAXLEVEL
);
buf
[
0
]
=
record
->
type
;
buf
[
1
]
=
record
->
level
;
if
(
record
->
keylen
<
UINT16_MAX
)
{
*
((
uint16_t
*
)(
buf
+
2
))
=
htons
(
record
->
keylen
);
}
else
{
*
((
uint16_t
*
)(
buf
+
2
))
=
htons
(
UINT16_MAX
);
*
((
uint64_t
*
)(
buf
+
len
))
=
htonll
(
record
->
keylen
);
len
+=
8
;
}
if
(
record
->
vallen
<
UINT32_MAX
)
{
*
((
uint32_t
*
)(
buf
+
4
))
=
htonl
(
record
->
vallen
);
}
else
{
*
((
uint32_t
*
)(
buf
+
4
))
=
htonl
(
UINT32_MAX
);
*
((
uint64_t
*
)(
buf
+
len
))
=
htonll
(
record
->
vallen
);
len
+=
8
;
}
/* got pointers? */
for
(
i
=
0
;
i
<=
record
->
level
;
i
++
)
{
*
((
uint64_t
*
)(
buf
+
len
))
=
htonll
(
record
->
nextloc
[
i
]);
len
+=
8
;
}
/* NOTE: crc32_tail does not change */
record
->
crc32_head
=
crc32_map
(
buf
,
len
);
*
((
uint32_t
*
)(
buf
+
len
))
=
htonl
(
record
->
crc32_head
);
*
((
uint32_t
*
)(
buf
+
len
+
4
))
=
htonl
(
record
->
crc32_tail
);
len
+=
8
;
*
sizep
=
len
;
}
/* only changing the record head, so only rewrite that much */
static
int
rewrite_record
(
struct
dbengine
*
db
,
struct
skiprecord
*
record
)
{
char
*
buf
=
scratchspace
.
s
;
size_t
len
;
/* we must already be in a transaction before updating records */
assert
(
db
->
header
.
flags
&
DIRTY
);
assert
(
record
->
offset
);
prepare_record
(
record
,
buf
,
&
len
);
if
(
mappedfile_pwrite
(
db
->
mf
,
buf
,
len
,
record
->
offset
)
<
0
)
return
CYRUSDB_IOERROR
;
return
0
;
}
/* Add BLANK padding sets of 8 bytes until the entire header will write
* in a single disk block for safety.
* This ONLY works if the header is small enough obviously (should always be)
* the algorithm checks if the end % BLOCKSIZE is smaller than the start % BLOCKSIZE
* in which case we've wrapped a block */
static
int
add_padding
(
struct
dbengine
*
db
,
size_t
iolen
)
{
// if it will always span a block, there's no point padding (and besides the
// algorithm won't work)
if
(
iolen
>
BLOCKSIZE
-
8
)
return
0
;
// start with 8 byte offset, because the first 8 bytes never get rewritten
// and end with 8 before, because if it fits exactly that's OK too
// consider 480 bytes in, 32 bytes to write
// 480+8 == 488, 480 - 8 + 32 == 504, so no padding
// consider 488 bytes in:
// 488+8 == 496, 488 - 8 + 32 == 512 -> 0 so we pad
// 496+8 == 504 to 8, so we pad again
// 504+8 == 512 -> 0, so we don't pad any more!
// we write the new record from 504 onwards, with the header line before the
// block boundary, maximising use of space!
while
(((
db
->
end
+
8
)
%
BLOCKSIZE
)
>
((
db
->
end
-
8
+
iolen
)
%
BLOCKSIZE
))
{
int
n
=
mappedfile_pwrite
(
db
->
mf
,
BLANK
,
8
,
db
->
end
);
if
(
n
<
0
)
return
CYRUSDB_IOERROR
;
db
->
end
+=
8
;
}
return
0
;
}
/* you can only write records at the end */
static
int
write_record
(
struct
dbengine
*
db
,
struct
skiprecord
*
record
,
const
char
*
key
,
const
char
*
val
)
{
char
zeros
[
8
]
=
{
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
};
uint64_t
len
;
size_t
iolen
=
0
;
struct
iovec
io
[
4
];
int
n
;
int
r
;
assert
(
!
record
->
offset
);
/* we'll put the HEAD on later */
io
[
0
].
iov_base
=
scratchspace
.
s
;
io
[
0
].
iov_len
=
0
;
io
[
1
].
iov_base
=
(
char
*
)
key
;
io
[
1
].
iov_len
=
record
->
keylen
;
io
[
2
].
iov_base
=
(
char
*
)
val
;
io
[
2
].
iov_len
=
record
->
vallen
;
/* pad to 8 bytes */
len
=
record
->
vallen
+
record
->
keylen
;
io
[
3
].
iov_base
=
zeros
;
io
[
3
].
iov_len
=
roundup
(
len
,
8
)
-
len
;
/* calculate the CRC32 of the tail first */
record
->
crc32_tail
=
crc32_iovec
(
io
+
1
,
3
);
/* prepare the record once we know the crc32 of the tail */
prepare_record
(
record
,
scratchspace
.
s
,
&
iolen
);
io
[
0
].
iov_base
=
scratchspace
.
s
;
io
[
0
].
iov_len
=
iolen
;
/* ensure that the record header be contained within a single disk block */
r
=
add_padding
(
db
,
iolen
);
if
(
r
)
return
r
;
/* write to the mapped file, getting the offset updated */
n
=
mappedfile_pwritev
(
db
->
mf
,
io
,
4
,
db
->
end
);
if
(
n
<
0
)
return
CYRUSDB_IOERROR
;
/* locate the record */
record
->
offset
=
db
->
end
;
record
->
keyoffset
=
db
->
end
+
io
[
0
].
iov_len
;
record
->
valoffset
=
record
->
keyoffset
+
record
->
keylen
;
record
->
len
=
n
;
/* and advance the known file size */
db
->
end
+=
n
;
return
0
;
}
/* helper to append a record, starting the transaction by dirtying the
* header first if required */
static
int
append_record
(
struct
dbengine
*
db
,
struct
skiprecord
*
record
,
const
char
*
key
,
const
char
*
val
)
{
int
r
;
assert
(
db
->
current_txn
);
/* dirty the header if not already dirty */
if
(
!
(
db
->
header
.
flags
&
DIRTY
))
{
db
->
header
.
flags
|=
DIRTY
;
r
=
commit_header
(
db
);
if
(
r
)
return
r
;
}
return
write_record
(
db
,
record
,
key
,
val
);
}
/************************** LOCATION MANAGEMENT ***************************/
/* find the next record at a given level, encapsulating the
* level 0 magic */
#ifdef HAVE_DECLARE_OPTIMIZE
static
size_t
_getloc
(
struct
dbengine
*
db
,
struct
skiprecord
*
record
,
uint8_t
level
)
__attribute__
((
optimize
(
"-O3"
)));
#endif
static
size_t
_getloc
(
struct
dbengine
*
db
,
struct
skiprecord
*
record
,
uint8_t
level
)
{
if
(
level
)
return
record
->
nextloc
[
level
+
1
];
/* if one is past, must be the other */
if
(
record
->
nextloc
[
0
]
>=
db
->
end
)
return
record
->
nextloc
[
1
];
if
(
record
->
nextloc
[
1
]
>=
db
->
end
)
return
record
->
nextloc
[
0
];
/* highest remaining */
if
(
record
->
nextloc
[
0
]
>
record
->
nextloc
[
1
])
return
record
->
nextloc
[
0
];
return
record
->
nextloc
[
1
];
}
/* set the next record at a given level, encapsulating the
* level 0 magic */
#ifdef HAVE_DECLARE_OPTIMIZE
static
void
_setloc
(
struct
dbengine
*
db
,
struct
skiprecord
*
record
,
uint8_t
level
,
size_t
offset
)
__attribute__
((
optimize
(
"-O3"
)));
#endif
static
void
_setloc
(
struct
dbengine
*
db
,
struct
skiprecord
*
record
,
uint8_t
level
,
size_t
offset
)
{
if
(
level
)
{
record
->
nextloc
[
level
+
1
]
=
offset
;
return
;
}
/* level zero is special */
/* already this transaction, update this one */
if
(
record
->
nextloc
[
0
]
>=
db
->
header
.
current_size
)
record
->
nextloc
[
0
]
=
offset
;
else
if
(
record
->
nextloc
[
1
]
>=
db
->
header
.
current_size
)
record
->
nextloc
[
1
]
=
offset
;
/* otherwise, update older one */
else
if
(
record
->
nextloc
[
1
]
>
record
->
nextloc
[
0
])
record
->
nextloc
[
0
]
=
offset
;
else
record
->
nextloc
[
1
]
=
offset
;
}
/* finds a record, either an exact match or the record
* immediately before */
#ifdef HAVE_DECLARE_OPTIMIZE
static
int
relocate
(
struct
dbengine
*
db
)
__attribute__
((
optimize
(
"-O3"
)));
#endif
static
int
relocate
(
struct
dbengine
*
db
)
{
struct
skiploc
*
loc
=
&
db
->
loc
;
struct
skiprecord
newrecord
;
size_t
offset
;
size_t
oldoffset
=
0
;
uint8_t
level
;
uint8_t
i
;
int
cmp
=
-1
;
/* never found a thing! */
int
r
;
/* pointer validity */
loc
->
generation
=
db
->
header
.
generation
;
loc
->
end
=
db
->
end
;
/* start with the dummy */
r
=
read_onerecord
(
db
,
DUMMY_OFFSET
,
&
loc
->
record
);
loc
->
is_exactmatch
=
0
;
/* initialise pointers */
level
=
loc
->
record
.
level
;
newrecord
.
offset
=
0
;
loc
->
backloc
[
level
]
=
loc
->
record
.
offset
;
loc
->
forwardloc
[
level
]
=
0
;
/* special case start pointer for efficiency */
if
(
!
loc
->
keybuf
.
len
)
{
for
(
i
=
0
;
i
<
loc
->
record
.
level
;
i
++
)
{
loc
->
backloc
[
i
]
=
loc
->
record
.
offset
;
loc
->
forwardloc
[
i
]
=
_getloc
(
db
,
&
loc
->
record
,
i
);
}
return
0
;
}
while
(
level
)
{
offset
=
_getloc
(
db
,
&
loc
->
record
,
level
-1
);
loc
->
backloc
[
level
-1
]
=
loc
->
record
.
offset
;
loc
->
forwardloc
[
level
-1
]
=
offset
;
if
(
offset
!=
oldoffset
)
{
oldoffset
=
offset
;
r
=
read_skipdelete
(
db
,
offset
,
&
newrecord
);
if
(
r
)
return
r
;
if
(
newrecord
.
offset
)
{
assert
(
newrecord
.
level
>=
level
);
cmp
=
db
->
compar
(
KEY
(
db
,
&
newrecord
),
newrecord
.
keylen
,
loc
->
keybuf
.
s
,
loc
->
keybuf
.
len
);
/* not there? stay at this level */
if
(
cmp
<
0
)
{
/* move the offset range along */
loc
->
record
=
newrecord
;
continue
;
}
}
}
level
--
;
}
if
(
cmp
==
0
)
{
/* we found it exactly */
loc
->
is_exactmatch
=
1
;
loc
->
record
=
newrecord
;
for
(
i
=
0
;
i
<
loc
->
record
.
level
;
i
++
)
loc
->
forwardloc
[
i
]
=
_getloc
(
db
,
&
loc
->
record
,
i
);
/* make sure this record is complete */
r
=
check_tailcrc
(
db
,
&
loc
->
record
);
if
(
r
)
return
r
;
}
return
0
;
}
/* helper function to find a location, either by using the existing
* location if it's close enough, or using the full relocate above */
static
int
find_loc
(
struct
dbengine
*
db
,
const
char
*
key
,
size_t
keylen
)
{
struct
skiprecord
newrecord
;
struct
skiploc
*
loc
=
&
db
->
loc
;
int
cmp
,
i
,
r
;
if
(
key
!=
loc
->
keybuf
.
s
)
buf_setmap
(
&
loc
->
keybuf
,
key
,
keylen
);
else
if
(
keylen
!=
loc
->
keybuf
.
len
)
buf_truncate
(
&
loc
->
keybuf
,
keylen
);
/* can we special case advance? */
if
(
keylen
&&
loc
->
end
==
db
->
end
&&
loc
->
generation
==
db
->
header
.
generation
)
{
cmp
=
db
->
compar
(
KEY
(
db
,
&
loc
->
record
),
loc
->
record
.
keylen
,
loc
->
keybuf
.
s
,
loc
->
keybuf
.
len
);
/* same place, and was exact. Otherwise we're going back,
* and the reverse pointers are no longer valid... */
if
(
db
->
loc
.
is_exactmatch
&&
cmp
==
0
)
{
return
0
;
}
/* we're looking after this record */
if
(
cmp
<
0
)
{
for
(
i
=
0
;
i
<
db
->
loc
.
record
.
level
;
i
++
)
loc
->
backloc
[
i
]
=
db
->
loc
.
record
.
offset
;
/* read the next record */
r
=
read_skipdelete
(
db
,
loc
->
forwardloc
[
0
],
&
newrecord
);
if
(
r
)
return
r
;
/* nothing afterwards? */
if
(
!
newrecord
.
offset
)
{
db
->
loc
.
is_exactmatch
=
0
;
return
0
;
}
/* now where is THIS record? */
cmp
=
db
->
compar
(
KEY
(
db
,
&
newrecord
),
newrecord
.
keylen
,
loc
->
keybuf
.
s
,
loc
->
keybuf
.
len
);
/* exact match? */
if
(
cmp
==
0
)
{
db
->
loc
.
is_exactmatch
=
1
;
db
->
loc
.
record
=
newrecord
;
for
(
i
=
0
;
i
<
newrecord
.
level
;
i
++
)
loc
->
forwardloc
[
i
]
=
_getloc
(
db
,
&
newrecord
,
i
);
/* make sure this record is complete */
return
check_tailcrc
(
db
,
&
loc
->
record
);
}
/* or in the gap */
if
(
cmp
>
0
)
{
db
->
loc
.
is_exactmatch
=
0
;
return
0
;
}
}
/* if we fell out here, it's not a "local" record, just search */
}
return
relocate
(
db
);
}
/* helper function to advance to the "next" record. Used by foreach,
* fetchnext, and internal functions */
static
int
advance_loc
(
struct
dbengine
*
db
)
{
struct
skiploc
*
loc
=
&
db
->
loc
;
uint8_t
i
;
int
r
;
/* has another session made changes? Need to re-find the location */
if
(
loc
->
end
!=
db
->
end
||
loc
->
generation
!=
db
->
header
.
generation
)
{
r
=
relocate
(
db
);
if
(
r
)
return
r
;
}
/* update back pointers */
for
(
i
=
0
;
i
<
loc
->
record
.
level
;
i
++
)
loc
->
backloc
[
i
]
=
loc
->
record
.
offset
;
/* ADVANCE */
r
=
read_skipdelete
(
db
,
loc
->
forwardloc
[
0
],
&
loc
->
record
);
if
(
r
)
return
r
;
/* reached the end? */
if
(
!
loc
->
record
.
offset
)
{
buf_reset
(
&
loc
->
keybuf
);
return
relocate
(
db
);
}
/* update forward pointers */
for
(
i
=
0
;
i
<
loc
->
record
.
level
;
i
++
)
loc
->
forwardloc
[
i
]
=
_getloc
(
db
,
&
loc
->
record
,
i
);
/* keep our location */
buf_setmap
(
&
loc
->
keybuf
,
KEY
(
db
,
&
loc
->
record
),
loc
->
record
.
keylen
);
loc
->
is_exactmatch
=
1
;
/* make sure this record is complete */
r
=
check_tailcrc
(
db
,
&
loc
->
record
);
if
(
r
)
return
r
;
return
0
;
}
/* helper function to update all the back records efficiently
* after appending a new record, either create or delete. The
* caller must set forwardloc[] correctly for each level it has
* changed */
static
int
stitch
(
struct
dbengine
*
db
,
uint8_t
maxlevel
,
size_t
newoffset
)
{
struct
skiploc
*
loc
=
&
db
->
loc
;
struct
skiprecord
oldrecord
;
uint8_t
i
;
int
r
;
oldrecord
.
level
=
0
;
while
(
oldrecord
.
level
<
maxlevel
)
{
uint8_t
level
=
oldrecord
.
level
;
r
=
read_onerecord
(
db
,
loc
->
backloc
[
level
],
&
oldrecord
);
if
(
r
)
return
r
;
/* always getting higher */
assert
(
oldrecord
.
level
>
level
);
for
(
i
=
level
;
i
<
maxlevel
;
i
++
)
_setloc
(
db
,
&
oldrecord
,
i
,
loc
->
forwardloc
[
i
]);
r
=
rewrite_record
(
db
,
&
oldrecord
);
if
(
r
)
return
r
;
}
/* re-read the "current record" */
r
=
read_onerecord
(
db
,
newoffset
,
&
loc
->
record
);
if
(
r
)
return
r
;
/* and update the forward locations */
for
(
i
=
0
;
i
<
loc
->
record
.
level
;
i
++
)
loc
->
forwardloc
[
i
]
=
_getloc
(
db
,
&
loc
->
record
,
i
);
return
0
;
}
/* overall "store" function - update the value in the current loc.
All new values funnel through here. Check delete_here for
deletion. Force is implied here, it gets checked higher. */
static
int
store_here
(
struct
dbengine
*
db
,
const
char
*
val
,
size_t
vallen
)
{
struct
skiploc
*
loc
=
&
db
->
loc
;
struct
skiprecord
newrecord
;
uint8_t
level
=
0
;
uint8_t
i
;
int
r
;
if
(
loc
->
is_exactmatch
)
{
level
=
loc
->
record
.
level
;
db
->
header
.
num_records
--
;
}
/* build a new record */
memset
(
&
newrecord
,
0
,
sizeof
(
struct
skiprecord
));
newrecord
.
type
=
RECORD
;
newrecord
.
level
=
randlvl
(
1
,
MAXLEVEL
);
newrecord
.
keylen
=
loc
->
keybuf
.
len
;
newrecord
.
vallen
=
vallen
;
for
(
i
=
0
;
i
<
newrecord
.
level
;
i
++
)
newrecord
.
nextloc
[
i
+
1
]
=
loc
->
forwardloc
[
i
];
if
(
newrecord
.
level
>
level
)
level
=
newrecord
.
level
;
/* append to the file */
r
=
append_record
(
db
,
&
newrecord
,
loc
->
keybuf
.
s
,
val
);
if
(
r
)
return
r
;
/* get the nextlevel to point here for all this record's levels */
for
(
i
=
0
;
i
<
newrecord
.
level
;
i
++
)
loc
->
forwardloc
[
i
]
=
newrecord
.
offset
;
/* update all backpointers */
r
=
stitch
(
db
,
level
,
newrecord
.
offset
);
if
(
r
)
return
r
;
/* update header to know details of new record */
db
->
header
.
num_records
++
;
loc
->
is_exactmatch
=
1
;
loc
->
end
=
db
->
end
;
return
0
;
}
/* delete a record */
static
int
delete_here
(
struct
dbengine
*
db
)
{
struct
skiploc
*
loc
=
&
db
->
loc
;
struct
skiprecord
newrecord
;
struct
skiprecord
nextrecord
;
int
r
;
if
(
!
loc
->
is_exactmatch
)
return
CYRUSDB_NOTFOUND
;
db
->
header
.
num_records
--
;
/* by the magic of zeroing, this even works for zero */
r
=
read_skipdelete
(
db
,
loc
->
forwardloc
[
0
],
&
nextrecord
);
if
(
r
)
return
r
;
/* build a delete record */
memset
(
&
newrecord
,
0
,
sizeof
(
struct
skiprecord
));
newrecord
.
type
=
DELETE
;
newrecord
.
nextloc
[
0
]
=
nextrecord
.
offset
;
/* append to the file */
r
=
append_record
(
db
,
&
newrecord
,
NULL
,
NULL
);
if
(
r
)
return
r
;
/* get the nextlevel to point here */
loc
->
forwardloc
[
0
]
=
newrecord
.
offset
;
/* update all backpointers right up to the old record's
* level, so that they all point past */
r
=
stitch
(
db
,
loc
->
record
.
level
,
loc
->
backloc
[
0
]);
if
(
r
)
return
r
;
/* update location */
loc
->
is_exactmatch
=
0
;
loc
->
end
=
db
->
end
;
return
0
;
}
/************ DATABASE STRUCT AND TRANSACTION MANAGEMENT **************/
static
int
db_is_clean
(
struct
dbengine
*
db
)
{
if
(
db
->
header
.
current_size
!=
SIZE
(
db
))
return
0
;
if
(
db
->
header
.
flags
&
DIRTY
)
return
0
;
return
1
;
}
static
int
unlock
(
struct
dbengine
*
db
)
{
return
mappedfile_unlock
(
db
->
mf
);
}
static
int
write_lock
(
struct
dbengine
*
db
)
{
int
r
=
mappedfile_writelock
(
db
->
mf
);
if
(
r
)
return
r
;
/* reread header */
if
(
db
->
is_open
)
{
r
=
read_header
(
db
);
if
(
r
)
return
r
;
/* recovery checks for consistency */
r
=
recovery
(
db
);
if
(
r
)
return
r
;
}
return
0
;
}
static
int
read_lock
(
struct
dbengine
*
db
)
{
int
r
=
mappedfile_readlock
(
db
->
mf
);
if
(
r
)
return
r
;
/* reread header */
if
(
db
->
is_open
)
{
r
=
read_header
(
db
);
if
(
r
)
return
r
;
/* we just take and keep a write lock if inconsistent,
* the write lock will fix it up */
if
(
!
db_is_clean
(
db
))
{
unlock
(
db
);
r
=
write_lock
(
db
);
if
(
r
)
return
r
;
/* downgrade to a read lock again, since that what
* was requested */
unlock
(
db
);
return
read_lock
(
db
);
}
}
return
0
;
}
static
int
newtxn
(
struct
dbengine
*
db
,
int
shared
,
struct
txn
**
tidptr
)
{
int
r
;
assert
(
!
db
->
current_txn
);
assert
(
!*
tidptr
);
/* grab a lock */
r
=
shared
?
read_lock
(
db
)
:
write_lock
(
db
);
if
(
r
)
return
r
;
db
->
txn_num
++
;
/* create the transaction */
struct
txn
*
txn
=
xzmalloc
(
sizeof
(
struct
txn
));
txn
->
num
=
db
->
txn_num
;
txn
->
shared
=
shared
;
db
->
current_txn
=
txn
;
/* pass it back out */
*
tidptr
=
txn
;
return
0
;
}
static
void
dispose_db
(
struct
dbengine
*
db
)
{
if
(
!
db
)
return
;
if
(
db
->
mf
)
{
if
(
mappedfile_islocked
(
db
->
mf
))
unlock
(
db
);
mappedfile_close
(
&
db
->
mf
);
}
buf_free
(
&
db
->
loc
.
keybuf
);
free
(
db
);
}
/************************************************************/
static
int
opendb
(
const
char
*
fname
,
int
flags
,
struct
dbengine
**
ret
,
struct
txn
**
mytid
)
{
struct
dbengine
*
db
;
int
r
;
int
mappedfile_flags
=
MAPPEDFILE_RW
;
assert
(
fname
);
assert
(
ret
);
db
=
(
struct
dbengine
*
)
xzmalloc
(
sizeof
(
struct
dbengine
));
if
(
flags
&
CYRUSDB_CREATE
)
mappedfile_flags
|=
MAPPEDFILE_CREATE
;
db
->
open_flags
=
flags
&
~
CYRUSDB_CREATE
;
db
->
compar
=
(
flags
&
CYRUSDB_MBOXSORT
)
?
bsearch_ncompare_mbox
:
bsearch_ncompare_raw
;
r
=
mappedfile_open
(
&
db
->
mf
,
fname
,
mappedfile_flags
);
if
(
r
)
{
/* convert to CYRUSDB errors*/
if
(
r
==
-
ENOENT
)
r
=
CYRUSDB_NOTFOUND
;
else
r
=
CYRUSDB_IOERROR
;
goto
done
;
}
db
->
is_open
=
0
;
/* grab a read lock, only reading the header */
r
=
read_lock
(
db
);
if
(
r
)
goto
done
;
/* if there's any issue which requires fixing, get a write lock */
if
(
0
)
{
retry_write
:
unlock
(
db
);
db
->
is_open
=
0
;
r
=
write_lock
(
db
);
if
(
r
)
goto
done
;
}
/* if the map size is zero, it's a new file - we need to create an
* initial header */
if
(
mappedfile_size
(
db
->
mf
)
==
0
)
{
struct
skiprecord
dummy
;
if
(
!
mappedfile_iswritelocked
(
db
->
mf
))
goto
retry_write
;
/* create the dummy! */
memset
(
&
dummy
,
0
,
sizeof
(
struct
skiprecord
));
dummy
.
type
=
DUMMY
;
dummy
.
level
=
MAXLEVEL
;
/* append dummy after header location */
db
->
end
=
DUMMY_OFFSET
;
r
=
write_record
(
db
,
&
dummy
,
NULL
,
NULL
);
if
(
r
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: error writing dummy node"
,
"filename=<%s>"
,
fname
);
goto
done
;
}
/* create the header */
db
->
header
.
version
=
VERSION
;
db
->
header
.
generation
=
1
;
db
->
header
.
repack_size
=
db
->
end
;
db
->
header
.
current_size
=
db
->
end
;
r
=
commit_header
(
db
);
if
(
r
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: error writing header"
,
"filename=<%s>"
,
fname
);
goto
done
;
}
}
db
->
is_open
=
1
;
r
=
read_header
(
db
);
if
(
r
)
goto
done
;
if
(
!
db_is_clean
(
db
))
{
if
(
!
mappedfile_iswritelocked
(
db
->
mf
))
goto
retry_write
;
/* recovery will clean the flag once it's committed the fixes */
r
=
recovery
(
db
);
if
(
r
)
goto
done
;
}
/* unlock the DB */
unlock
(
db
);
*
ret
=
db
;
if
(
mytid
)
{
r
=
newtxn
(
db
,
flags
&
CYRUSDB_SHARED
,
mytid
);
if
(
r
)
goto
done
;
}
done
:
if
(
r
)
dispose_db
(
db
);
return
r
;
}
static
int
myopen
(
const
char
*
fname
,
int
flags
,
struct
dbengine
**
ret
,
struct
txn
**
mytid
)
{
struct
db_list
*
ent
;
struct
dbengine
*
mydb
;
int
r
=
0
;
/* do we already have this DB open? */
for
(
ent
=
open_twoskip
;
ent
;
ent
=
ent
->
next
)
{
if
(
strcmp
(
FNAME
(
ent
->
db
),
fname
))
continue
;
if
(
ent
->
db
->
current_txn
)
{
/* XXX we could gracefully handle attempts to open
* a shared-lock database multiple times.e.g by
* ref-counting transactions. But it's likely that
* multiple open attempts are a bug in the caller's
* logic, so error out here */
return
CYRUSDB_LOCKED
;
}
if
(
mytid
)
{
r
=
newtxn
(
ent
->
db
,
flags
&
CYRUSDB_SHARED
,
mytid
);
if
(
r
)
return
r
;
}
ent
->
refcount
++
;
*
ret
=
ent
->
db
;
return
0
;
}
r
=
opendb
(
fname
,
flags
,
&
mydb
,
mytid
);
if
(
r
)
return
r
;
/* track this database in the open list */
ent
=
(
struct
db_list
*
)
xzmalloc
(
sizeof
(
struct
db_list
));
ent
->
db
=
mydb
;
ent
->
refcount
=
1
;
ent
->
next
=
open_twoskip
;
open_twoskip
=
ent
;
/* return the open DB */
*
ret
=
mydb
;
return
0
;
}
static
int
myclose
(
struct
dbengine
*
db
)
{
struct
db_list
*
ent
=
open_twoskip
;
struct
db_list
*
prev
=
NULL
;
assert
(
db
);
/* remove this DB from the open list */
while
(
ent
&&
ent
->
db
!=
db
)
{
prev
=
ent
;
ent
=
ent
->
next
;
}
assert
(
ent
);
if
(
--
ent
->
refcount
<=
0
)
{
if
(
prev
)
prev
->
next
=
ent
->
next
;
else
open_twoskip
=
ent
->
next
;
free
(
ent
);
if
(
mappedfile_islocked
(
db
->
mf
))
syslog
(
LOG_ERR
,
"twoskip: %s closed while still locked"
,
FNAME
(
db
));
dispose_db
(
db
);
}
return
0
;
}
/*************** EXTERNAL APIS ***********************/
static
int
myfetch
(
struct
dbengine
*
db
,
const
char
*
key
,
size_t
keylen
,
const
char
**
foundkey
,
size_t
*
foundkeylen
,
const
char
**
data
,
size_t
*
datalen
,
struct
txn
**
tidptr
,
int
fetchnext
)
{
int
r
=
0
;
assert
(
db
);
if
(
datalen
)
assert
(
data
);
if
(
data
)
*
data
=
NULL
;
if
(
datalen
)
*
datalen
=
0
;
/* Hacky workaround:
*
* If no transaction was passed, but we're in a transaction,
* then just do the read within that transaction.
*/
if
(
!
tidptr
&&
db
->
current_txn
)
tidptr
=
&
db
->
current_txn
;
if
(
tidptr
)
{
if
(
!*
tidptr
)
{
r
=
newtxn
(
db
,
0
/*shared*/
,
tidptr
);
if
(
r
)
return
r
;
}
}
else
{
/* grab a r lock */
r
=
read_lock
(
db
);
if
(
r
)
return
r
;
}
r
=
find_loc
(
db
,
key
,
keylen
);
if
(
r
)
goto
done
;
if
(
fetchnext
)
{
r
=
advance_loc
(
db
);
if
(
r
)
goto
done
;
}
if
(
foundkey
)
*
foundkey
=
db
->
loc
.
keybuf
.
s
;
if
(
foundkeylen
)
*
foundkeylen
=
db
->
loc
.
keybuf
.
len
;
if
(
db
->
loc
.
is_exactmatch
)
{
if
(
data
)
*
data
=
VAL
(
db
,
&
db
->
loc
.
record
);
if
(
datalen
)
*
datalen
=
db
->
loc
.
record
.
vallen
;
}
else
{
/* we didn't get an exact match */
r
=
CYRUSDB_NOTFOUND
;
}
done
:
if
(
!
tidptr
)
{
/* release read lock */
int
r1
;
if
((
r1
=
unlock
(
db
))
<
0
)
{
return
r1
;
}
}
return
r
;
}
/* foreach allows for subsidiary mailbox operations in 'cb'.
if there is a txn, 'cb' must make use of it.
*/
static
int
myforeach
(
struct
dbengine
*
db
,
const
char
*
prefix
,
size_t
prefixlen
,
foreach_p
*
goodp
,
foreach_cb
*
cb
,
void
*
rock
,
struct
txn
**
tidptr
)
{
int
r
=
0
,
cb_r
=
0
;
int
num_misses
=
0
;
int
need_unlock
=
0
;
const
char
*
val
;
size_t
vallen
;
struct
buf
keybuf
=
BUF_INITIALIZER
;
assert
(
db
);
assert
(
cb
);
if
(
prefixlen
)
assert
(
prefix
);
/* Hacky workaround:
*
* If no transaction was passed, but we're in a transaction,
* then just do the read within that transaction.
*/
if
(
!
tidptr
&&
db
->
current_txn
)
tidptr
=
&
db
->
current_txn
;
if
(
tidptr
)
{
if
(
!*
tidptr
)
{
r
=
newtxn
(
db
,
0
/*shared*/
,
tidptr
);
if
(
r
)
return
r
;
}
}
else
{
/* grab a r lock */
r
=
read_lock
(
db
);
if
(
r
)
return
r
;
need_unlock
=
1
;
}
r
=
find_loc
(
db
,
prefix
,
prefixlen
);
if
(
r
)
goto
done
;
if
(
!
db
->
loc
.
is_exactmatch
)
{
/* advance to the first match */
r
=
advance_loc
(
db
);
if
(
r
)
goto
done
;
}
while
(
db
->
loc
.
is_exactmatch
)
{
/* does it match prefix? */
if
(
prefixlen
)
{
if
(
db
->
loc
.
record
.
keylen
<
prefixlen
)
break
;
if
(
db
->
compar
(
KEY
(
db
,
&
db
->
loc
.
record
),
prefixlen
,
prefix
,
prefixlen
))
break
;
}
val
=
VAL
(
db
,
&
db
->
loc
.
record
);
vallen
=
db
->
loc
.
record
.
vallen
;
if
(
!
goodp
||
goodp
(
rock
,
db
->
loc
.
keybuf
.
s
,
db
->
loc
.
keybuf
.
len
,
val
,
vallen
))
{
/* take a copy of they key - just in case cb does actions on this database
* and clobbers loc */
buf_copy
(
&
keybuf
,
&
db
->
loc
.
keybuf
);
if
(
!
tidptr
)
{
/* release read lock */
r
=
unlock
(
db
);
if
(
r
)
goto
done
;
need_unlock
=
0
;
}
/* make callback */
cb_r
=
cb
(
rock
,
db
->
loc
.
keybuf
.
s
,
db
->
loc
.
keybuf
.
len
,
val
,
vallen
);
if
(
cb_r
)
break
;
if
(
!
tidptr
)
{
/* grab a r lock */
r
=
read_lock
(
db
);
if
(
r
)
goto
done
;
need_unlock
=
1
;
num_misses
=
0
;
}
/* should be cheap if we're already here */
r
=
find_loc
(
db
,
keybuf
.
s
,
keybuf
.
len
);
if
(
r
)
goto
done
;
}
else
if
(
!
tidptr
)
{
num_misses
++
;
if
(
num_misses
>
FOREACH_LOCK_RELEASE
)
{
/* take a copy of they key - just in case cb does actions on this database
* and clobbers loc */
buf_copy
(
&
keybuf
,
&
db
->
loc
.
keybuf
);
/* release read lock */
r
=
unlock
(
db
);
if
(
r
)
goto
done
;
need_unlock
=
0
;
/* grab a r lock */
r
=
read_lock
(
db
);
if
(
r
)
goto
done
;
need_unlock
=
1
;
/* should be cheap if we're already here */
r
=
find_loc
(
db
,
keybuf
.
s
,
keybuf
.
len
);
if
(
r
)
goto
done
;
num_misses
=
0
;
}
}
/* move to the next one */
r
=
advance_loc
(
db
);
if
(
r
)
goto
done
;
}
done
:
buf_free
(
&
keybuf
);
if
(
need_unlock
)
{
/* release read lock */
int
r1
=
unlock
(
db
);
if
(
r1
)
return
r1
;
}
return
r
?
r
:
cb_r
;
}
/* helper function for all writes - wraps create and delete and the FORCE
* logic for each */
static
int
skipwrite
(
struct
dbengine
*
db
,
const
char
*
key
,
size_t
keylen
,
const
char
*
data
,
size_t
datalen
,
int
force
)
{
int
r
=
find_loc
(
db
,
key
,
keylen
);
if
(
r
)
return
r
;
/* could be a delete or a replace */
if
(
db
->
loc
.
is_exactmatch
)
{
if
(
!
data
)
return
delete_here
(
db
);
if
(
!
force
)
return
CYRUSDB_EXISTS
;
/* unchanged? Save the IO */
if
(
!
db
->
compar
(
data
,
datalen
,
VAL
(
db
,
&
db
->
loc
.
record
),
db
->
loc
.
record
.
vallen
))
return
0
;
return
store_here
(
db
,
data
,
datalen
);
}
/* only create if it's not a delete, obviously */
if
(
data
)
return
store_here
(
db
,
data
,
datalen
);
/* must be a delete - are we forcing? */
if
(
!
force
)
return
CYRUSDB_NOTFOUND
;
return
0
;
}
static
int
mycommit
(
struct
dbengine
*
db
,
struct
txn
*
tid
)
{
struct
skiprecord
newrecord
;
int
r
=
0
;
assert
(
db
);
assert
(
tid
==
db
->
current_txn
);
/* no need to commit if we're not dirty */
if
(
!
(
db
->
header
.
flags
&
DIRTY
))
goto
done
;
assert
(
db
->
current_txn
);
if
(
db
->
current_txn
->
shared
)
goto
done
;
/* build a commit record */
memset
(
&
newrecord
,
0
,
sizeof
(
struct
skiprecord
));
newrecord
.
type
=
COMMIT
;
newrecord
.
nextloc
[
0
]
=
db
->
header
.
current_size
;
/* append to the file */
r
=
append_record
(
db
,
&
newrecord
,
NULL
,
NULL
);
if
(
r
)
goto
done
;
/* commit ALL outstanding changes first, before
* rewriting the header */
r
=
mappedfile_commit
(
db
->
mf
);
if
(
r
)
goto
done
;
/* finally, update the header and commit again */
db
->
header
.
current_size
=
db
->
end
;
db
->
header
.
flags
&=
~
DIRTY
;
r
=
commit_header
(
db
);
done
:
if
(
r
)
{
int
r2
;
/* error during commit; we must abort */
r2
=
myabort
(
db
,
tid
);
if
(
r2
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: commit AND abort failed"
,
"filename=<%s>"
,
FNAME
(
db
));
}
}
else
{
if
(
db
->
current_txn
&&
!
db
->
current_txn
->
shared
&&
!
(
db
->
open_flags
&
CYRUSDB_NOCOMPACT
)
&&
db
->
header
.
current_size
>
MINREWRITE
&&
db
->
header
.
current_size
>
2
*
db
->
header
.
repack_size
)
{
int
r2
=
mycheckpoint
(
db
);
if
(
r2
)
{
syslog
(
LOG_NOTICE
,
"twoskip: failed to checkpoint %s: %m"
,
FNAME
(
db
));
}
}
else
{
unlock
(
db
);
}
free
(
tid
);
db
->
current_txn
=
NULL
;
}
return
r
;
}
static
int
myabort
(
struct
dbengine
*
db
,
struct
txn
*
tid
)
{
int
r
;
assert
(
db
);
assert
(
tid
==
db
->
current_txn
);
/* free the tid */
free
(
tid
);
db
->
current_txn
=
NULL
;
db
->
end
=
db
->
header
.
current_size
;
/* recovery will clean up */
r
=
recovery1
(
db
,
NULL
);
buf_free
(
&
db
->
loc
.
keybuf
);
memset
(
&
db
->
loc
,
0
,
sizeof
(
struct
skiploc
));
unlock
(
db
);
return
r
;
}
static
int
mystore
(
struct
dbengine
*
db
,
const
char
*
key
,
size_t
keylen
,
const
char
*
data
,
size_t
datalen
,
struct
txn
**
tidptr
,
int
force
)
{
struct
txn
*
localtid
=
NULL
;
int
r
=
0
;
assert
(
db
);
assert
(
key
&&
keylen
);
/* reject store for shared locks */
if
(
tidptr
&&
*
tidptr
&&
(
*
tidptr
)
->
shared
)
return
CYRUSDB_READONLY
;
/* not keeping the transaction, just create one local to
* this function */
if
(
!
tidptr
)
tidptr
=
&
localtid
;
/* make sure we're write locked and up to date */
if
(
!*
tidptr
)
{
r
=
newtxn
(
db
,
0
/*shared*/
,
tidptr
);
if
(
r
)
return
r
;
}
r
=
skipwrite
(
db
,
key
,
keylen
,
data
,
datalen
,
force
);
if
(
r
)
{
int
r2
=
myabort
(
db
,
*
tidptr
);
*
tidptr
=
NULL
;
return
r2
?
r2
:
r
;
}
if
(
localtid
)
{
/* commit the store, which releases the write lock */
r
=
mycommit
(
db
,
localtid
);
}
return
r
;
}
/* compress 'db', closing at the end. Uses foreach to copy into a new
* database, then rewrites over the old one */
struct
copy_rock
{
struct
dbengine
*
db
;
struct
txn
*
tid
;
};
static
int
copy_cb
(
void
*
rock
,
const
char
*
key
,
size_t
keylen
,
const
char
*
val
,
size_t
vallen
)
{
struct
copy_rock
*
cr
=
(
struct
copy_rock
*
)
rock
;
return
mystore
(
cr
->
db
,
key
,
keylen
,
val
,
vallen
,
&
cr
->
tid
,
0
);
}
static
int
mycheckpoint
(
struct
dbengine
*
db
)
{
size_t
old_size
=
db
->
header
.
current_size
;
char
newfname
[
1024
];
clock_t
start
=
sclock
();
struct
copy_rock
cr
;
int
r
=
0
;
r
=
myconsistent
(
db
,
db
->
current_txn
);
if
(
r
)
{
syslog
(
LOG_ERR
,
"db %s, inconsistent pre-checkpoint, bailing out"
,
FNAME
(
db
));
unlock
(
db
);
return
r
;
}
/* open fname.NEW */
snprintf
(
newfname
,
sizeof
(
newfname
),
"%s.NEW"
,
FNAME
(
db
));
unlink
(
newfname
);
cr
.
db
=
NULL
;
cr
.
tid
=
NULL
;
r
=
opendb
(
newfname
,
db
->
open_flags
|
CYRUSDB_CREATE
,
&
cr
.
db
,
&
cr
.
tid
);
if
(
r
)
return
r
;
r
=
myforeach
(
db
,
NULL
,
0
,
NULL
,
copy_cb
,
&
cr
,
&
db
->
current_txn
);
if
(
r
)
goto
err
;
r
=
myconsistent
(
cr
.
db
,
cr
.
tid
);
if
(
r
)
{
syslog
(
LOG_ERR
,
"db %s, inconsistent post-checkpoint, bailing out"
,
FNAME
(
db
));
goto
err
;
}
/* remember the repack size */
cr
.
db
->
header
.
repack_size
=
cr
.
db
->
end
;
/* increase the generation count */
cr
.
db
->
header
.
generation
=
db
->
header
.
generation
+
1
;
r
=
mycommit
(
cr
.
db
,
cr
.
tid
);
if
(
r
)
goto
err
;
cr
.
tid
=
NULL
;
/* avoid later errors trying to call abort, it's too late! */
/* move new file to original file name */
r
=
mappedfile_rename
(
cr
.
db
->
mf
,
FNAME
(
db
));
if
(
r
)
goto
err
;
/* OK, we're committed now - clean up */
unlock
(
db
);
/* gotta clean it all up */
mappedfile_close
(
&
db
->
mf
);
buf_free
(
&
db
->
loc
.
keybuf
);
*
db
=
*
cr
.
db
;
free
(
cr
.
db
);
/* leaked? */
{
syslog
(
LOG_INFO
,
"twoskip: checkpointed %s (%llu record%s, %llu => %llu bytes) in %2.3f seconds"
,
FNAME
(
db
),
(
LLU
)
db
->
header
.
num_records
,
db
->
header
.
num_records
==
1
?
""
:
"s"
,
(
LLU
)
old_size
,
(
LLU
)(
db
->
header
.
current_size
),
(
sclock
()
-
start
)
/
(
double
)
CLOCKS_PER_SEC
);
}
return
0
;
err
:
if
(
cr
.
tid
)
myabort
(
cr
.
db
,
cr
.
tid
);
unlink
(
FNAME
(
cr
.
db
));
dispose_db
(
cr
.
db
);
unlock
(
db
);
return
CYRUSDB_IOERROR
;
}
/* dump the database.
if detail == 1, dump all records.
if detail == 2, also dump pointers for active records.
if detail == 3, dump all records/all pointers.
*/
static
int
dump
(
struct
dbengine
*
db
,
int
detail
)
{
struct
skiprecord
record
;
struct
buf
scratch
=
BUF_INITIALIZER
;
size_t
offset
=
DUMMY_OFFSET
;
int
r
=
0
;
int
i
;
printf
(
"HEADER: v=%lu fl=%lu num=%llu sz=(%08llX/%08llX)
\n
"
,
(
LU
)
db
->
header
.
version
,
(
LU
)
db
->
header
.
flags
,
(
LLU
)
db
->
header
.
num_records
,
(
LLU
)
db
->
header
.
current_size
,
(
LLU
)
db
->
header
.
repack_size
);
while
(
offset
<
db
->
header
.
current_size
)
{
printf
(
"%08llX "
,
(
LLU
)
offset
);
// skip over blanks
if
(
!
memcmp
(
BASE
(
db
)
+
offset
,
BLANK
,
8
))
{
printf
(
"BLANK
\n
"
);
offset
+=
8
;
continue
;
}
r
=
read_onerecord
(
db
,
offset
,
&
record
);
if
(
r
)
{
if
(
record
.
keyoffset
)
printf
(
"ERROR [HEADCRC %08lX %08lX]
\n
"
,
(
long
unsigned
)
record
.
crc32_head
,
(
long
unsigned
)
crc32_map
(
BASE
(
db
)
+
record
.
offset
,
record
.
keyoffset
-
8
));
else
printf
(
"ERROR
\n
"
);
break
;
}
if
(
check_tailcrc
(
db
,
&
record
))
{
printf
(
"ERROR [TAILCRC %08lX %08lX] "
,
(
long
unsigned
)
record
.
crc32_tail
,
(
long
unsigned
)
crc32_map
(
BASE
(
db
)
+
record
.
keyoffset
,
roundup
(
record
.
keylen
+
record
.
vallen
,
8
)));
}
switch
(
record
.
type
)
{
case
DELETE
:
printf
(
"DELETE ptr=%08llX
\n
"
,
(
LLU
)
record
.
nextloc
[
0
]);
break
;
case
COMMIT
:
printf
(
"COMMIT start=%08llX
\n
"
,
(
LLU
)
record
.
nextloc
[
0
]);
break
;
case
RECORD
:
case
DUMMY
:
buf_setmap
(
&
scratch
,
KEY
(
db
,
&
record
),
record
.
keylen
);
buf_replace_char
(
&
scratch
,
'\0'
,
'-'
);
printf
(
"%s kl=%llu dl=%llu lvl=%d (%s)
\n
"
,
(
record
.
type
==
RECORD
?
"RECORD"
:
"DUMMY"
),
(
LLU
)
record
.
keylen
,
(
LLU
)
record
.
vallen
,
record
.
level
,
buf_cstring
(
&
scratch
));
printf
(
"
\t
"
);
for
(
i
=
0
;
i
<=
record
.
level
;
i
++
)
{
printf
(
"%08llX "
,
(
LLU
)
record
.
nextloc
[
i
]);
if
(
!
(
i
%
8
))
printf
(
"
\n\t
"
);
}
printf
(
"
\n
"
);
if
(
detail
>
2
)
{
buf_setmap
(
&
scratch
,
VAL
(
db
,
&
record
),
record
.
vallen
);
buf_replace_char
(
&
scratch
,
'\0'
,
'-'
);
printf
(
"
\t
v=(%s)
\n
"
,
buf_cstring
(
&
scratch
));
}
break
;
}
offset
+=
record
.
len
;
}
buf_free
(
&
scratch
);
return
r
;
}
static
int
consistent
(
struct
dbengine
*
db
)
{
int
r
;
r
=
read_lock
(
db
);
if
(
r
)
return
r
;
r
=
myconsistent
(
db
,
NULL
);
unlock
(
db
);
return
r
;
}
/* perform some basic consistency checks */
static
int
myconsistent
(
struct
dbengine
*
db
,
struct
txn
*
tid
)
{
struct
skiprecord
prevrecord
;
struct
skiprecord
record
;
size_t
fwd
[
MAXLEVEL
];
size_t
num_records
=
0
;
int
r
=
0
;
int
cmp
;
int
i
;
assert
(
db
->
current_txn
==
tid
);
/* could both be null */
/* read in the dummy */
r
=
read_onerecord
(
db
,
DUMMY_OFFSET
,
&
prevrecord
);
if
(
r
)
return
r
;
/* set up the location pointers */
for
(
i
=
0
;
i
<
MAXLEVEL
;
i
++
)
fwd
[
i
]
=
_getloc
(
db
,
&
prevrecord
,
i
);
while
(
fwd
[
0
])
{
r
=
read_onerecord
(
db
,
fwd
[
0
],
&
record
);
if
(
r
)
return
r
;
if
(
record
.
type
==
DELETE
)
{
fwd
[
0
]
=
record
.
nextloc
[
0
];
continue
;
}
cmp
=
db
->
compar
(
KEY
(
db
,
&
record
),
record
.
keylen
,
KEY
(
db
,
&
prevrecord
),
prevrecord
.
keylen
);
if
(
cmp
<=
0
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: twoskip out of order"
,
"fname=<%s> key=<%.*s> offset=<%08llX>"
" prevkey=<%.*s> prevoffset=<%08llX)"
,
FNAME
(
db
),
(
int
)
record
.
keylen
,
KEY
(
db
,
&
record
),
(
LLU
)
record
.
offset
,
(
int
)
prevrecord
.
keylen
,
KEY
(
db
,
&
prevrecord
),
(
LLU
)
prevrecord
.
offset
);
return
CYRUSDB_INTERNAL
;
}
for
(
i
=
0
;
i
<
record
.
level
;
i
++
)
{
/* check the old pointer was to here */
if
(
fwd
[
i
]
!=
record
.
offset
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: twoskip broken linkage"
,
"filename=<%s> offset=<%08llX> level=<%d>"
" expected=<%08llX>"
,
FNAME
(
db
),
(
LLU
)
record
.
offset
,
i
,
(
LLU
)
fwd
[
i
]);
return
CYRUSDB_INTERNAL
;
}
/* and advance to the new pointer */
fwd
[
i
]
=
_getloc
(
db
,
&
record
,
i
);
}
/* keep a copy for comparison purposes */
num_records
++
;
prevrecord
=
record
;
}
for
(
i
=
0
;
i
<
MAXLEVEL
;
i
++
)
{
if
(
fwd
[
i
])
{
xsyslog
(
LOG_ERR
,
"DBERROR: twoskip broken tail"
,
"filename=<%s> offset=<%08llX> level=<%d>"
,
FNAME
(
db
),
(
LLU
)
fwd
[
i
],
i
);
return
CYRUSDB_INTERNAL
;
}
}
/* we walked the whole file and saw every pointer */
if
(
num_records
!=
db
->
header
.
num_records
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: twoskip record count mismatch"
,
"filename=<%s> num_records=<%llu> expected_records=<%llu>"
,
FNAME
(
db
),
(
LLU
)
num_records
,
(
LLU
)
db
->
header
.
num_records
);
return
CYRUSDB_INTERNAL
;
}
return
0
;
}
static
int
_copy_commit
(
struct
dbengine
*
db
,
struct
dbengine
*
newdb
,
struct
skiprecord
*
commit
)
{
struct
txn
*
tid
=
NULL
;
struct
skiprecord
record
;
size_t
offset
;
int
r
=
0
;
for
(
offset
=
commit
->
nextloc
[
0
];
offset
<
commit
->
offset
;
offset
+=
record
.
len
)
{
// skip over blanks
if
(
!
memcmp
(
BASE
(
db
)
+
offset
,
BLANK
,
8
))
{
record
.
len
=
8
;
continue
;
}
r
=
read_onerecord
(
db
,
offset
,
&
record
);
if
(
r
)
goto
err
;
switch
(
record
.
type
)
{
case
DELETE
:
/* find the record we are deleting */
r
=
read_onerecord
(
db
,
record
.
nextloc
[
0
],
&
record
);
if
(
r
)
goto
err
;
/* and delete it from the new DB */
r
=
mystore
(
newdb
,
KEY
(
db
,
&
record
),
record
.
keylen
,
NULL
,
0
,
&
tid
,
1
);
if
(
r
)
goto
err
;
break
;
case
RECORD
:
/* store into the new DB */
r
=
mystore
(
newdb
,
KEY
(
db
,
&
record
),
record
.
keylen
,
VAL
(
db
,
&
record
),
record
.
vallen
,
&
tid
,
1
);
if
(
r
)
goto
err
;
break
;
default
:
r
=
CYRUSDB_IOERROR
;
goto
err
;
}
}
return
tid
?
mycommit
(
newdb
,
tid
)
:
0
;
err
:
if
(
tid
)
myabort
(
newdb
,
tid
);
return
r
;
}
/* recovery2 - the file is really screwed. Basically, we
* failed to run recovery. Try reading out records from
* the top and applying commits to a new file instead */
static
int
recovery2
(
struct
dbengine
*
db
,
int
*
count
)
{
uint64_t
oldcount
=
db
->
header
.
num_records
;
struct
skiprecord
record
;
struct
dbengine
*
newdb
=
NULL
;
char
newfname
[
1024
];
size_t
offset
;
int
r
=
0
;
/* open fname.NEW */
snprintf
(
newfname
,
sizeof
(
newfname
),
"%s.NEW"
,
FNAME
(
db
));
unlink
(
newfname
);
r
=
opendb
(
newfname
,
db
->
open_flags
|
CYRUSDB_CREATE
,
&
newdb
,
NULL
);
if
(
r
)
return
r
;
/* increase the generation count */
newdb
->
header
.
generation
=
db
->
header
.
generation
+
1
;
/* start with the dummy */
int
dirty
=
0
;
for
(
offset
=
DUMMY_OFFSET
;
offset
<
SIZE
(
db
);
offset
+=
record
.
len
)
{
// skip over blanks
if
(
!
memcmp
(
BASE
(
db
)
+
offset
,
BLANK
,
8
))
{
record
.
len
=
8
;
continue
;
}
// if this record fails to read, keep looking ahead for a commit,
// and mark this entire transaction as dirty
r
=
read_onerecord
(
db
,
offset
,
&
record
);
if
(
r
)
{
dirty
++
;
xsyslog
(
LOG_ERR
,
"DBERROR: failed to read in recovery2, continuing"
,
"filename=<%s> offset=<%08llX>"
,
FNAME
(
db
),
(
LLU
)
offset
);
record
.
len
=
8
;
continue
;
}
if
(
record
.
type
==
COMMIT
)
{
if
(
!
dirty
)
{
r
=
_copy_commit
(
db
,
newdb
,
&
record
);
if
(
r
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: failed to apply commit in recovery2, continuing"
,
"filename=<%s> offset=<%08llX>"
,
FNAME
(
db
),
(
LLU
)
offset
);
}
}
dirty
=
0
;
}
}
if
(
!
newdb
->
header
.
num_records
)
{
/* no records found - almost certainly bogus, and even if not,
* there's no point recovering a zero record file */
xsyslog
(
LOG_ERR
,
"DBERROR: no records found in recovery2, aborting"
,
"filename=<%s>"
,
FNAME
(
db
));
r
=
CYRUSDB_NOTFOUND
;
goto
err
;
}
/* regardless, we had a commit during create, and in any _copy_commit, so
* rename into place */
/* move new file to original file name */
r
=
mappedfile_rename
(
newdb
->
mf
,
FNAME
(
db
));
if
(
r
)
goto
err
;
/* OK, we're committed now - clean up */
unlock
(
db
);
/* gotta clean it all up */
mappedfile_close
(
&
db
->
mf
);
buf_free
(
&
db
->
loc
.
keybuf
);
*
db
=
*
newdb
;
free
(
newdb
);
/* leaked? */
syslog
(
LOG_NOTICE
,
"twoskip: recovery2 %s - rescued %llu of %llu records"
,
FNAME
(
db
),
(
LLU
)
db
->
header
.
num_records
,
(
LLU
)
oldcount
);
if
(
count
)
*
count
=
db
->
header
.
num_records
;
return
0
;
err
:
unlink
(
FNAME
(
newdb
));
myclose
(
newdb
);
return
r
;
}
/* run recovery on this file.
* always called with a write lock. */
static
int
recovery1
(
struct
dbengine
*
db
,
int
*
count
)
{
size_t
prev
[
MAXLEVEL
+
1
];
size_t
next
[
MAXLEVEL
+
1
];
struct
skiprecord
record
;
struct
skiprecord
prevrecord
;
struct
skiprecord
fixrecord
;
size_t
nextoffset
=
0
;
uint64_t
num_records
=
0
;
int
changed
=
0
;
int
r
=
0
;
int
cmp
;
int
i
;
/* no need to run recovery if we're consistent */
if
(
db_is_clean
(
db
))
return
0
;
assert
(
mappedfile_iswritelocked
(
db
->
mf
));
/* we can't recovery a file that's not created yet */
assert
(
db
->
header
.
current_size
>
HEADER_SIZE
);
/* dirty the header if not already dirty */
if
(
!
(
db
->
header
.
flags
&
DIRTY
))
{
db
->
header
.
flags
|=
DIRTY
;
r
=
commit_header
(
db
);
if
(
r
)
return
r
;
}
/* start with the dummy */
r
=
read_onerecord
(
db
,
DUMMY_OFFSET
,
&
prevrecord
);
if
(
r
)
return
r
;
/* and pointers forwards */
for
(
i
=
2
;
i
<=
MAXLEVEL
;
i
++
)
{
prev
[
i
]
=
prevrecord
.
offset
;
next
[
i
]
=
prevrecord
.
nextloc
[
i
];
}
/* check for broken level - pointers */
for
(
i
=
0
;
i
<
2
;
i
++
)
{
if
(
prevrecord
.
nextloc
[
i
]
>=
db
->
end
)
{
prevrecord
.
nextloc
[
i
]
=
0
;
r
=
rewrite_record
(
db
,
&
prevrecord
);
changed
++
;
}
}
nextoffset
=
_getloc
(
db
,
&
prevrecord
,
0
);
while
(
nextoffset
)
{
r
=
read_onerecord
(
db
,
nextoffset
,
&
record
);
if
(
r
)
return
r
;
/* just skip over delete records */
if
(
record
.
type
==
DELETE
)
{
nextoffset
=
record
.
nextloc
[
0
];
continue
;
}
cmp
=
db
->
compar
(
KEY
(
db
,
&
record
),
record
.
keylen
,
KEY
(
db
,
&
prevrecord
),
prevrecord
.
keylen
);
if
(
cmp
<=
0
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: twoskip out of order"
,
"filename=<%s>"
" record_key=<%.*s> record_offset=<%08llX>"
" prev_key=<%.*s> prev_offset=<%08llX>"
,
FNAME
(
db
),
(
int
)
record
.
keylen
,
KEY
(
db
,
&
record
),
(
LLU
)
record
.
offset
,
(
int
)
prevrecord
.
keylen
,
KEY
(
db
,
&
prevrecord
),
(
LLU
)
prevrecord
.
offset
);
return
CYRUSDB_INTERNAL
;
}
/* check for old offsets needing fixing */
for
(
i
=
2
;
i
<=
record
.
level
;
i
++
)
{
if
(
next
[
i
]
!=
record
.
offset
)
{
/* need to fix up the previous record to point here */
r
=
read_onerecord
(
db
,
prev
[
i
],
&
fixrecord
);
if
(
r
)
return
r
;
/* XXX - optimise adjacent same records */
fixrecord
.
nextloc
[
i
]
=
record
.
offset
;
r
=
rewrite_record
(
db
,
&
fixrecord
);
if
(
r
)
return
r
;
changed
++
;
}
prev
[
i
]
=
record
.
offset
;
next
[
i
]
=
record
.
nextloc
[
i
];
}
/* check for broken level - pointers */
for
(
i
=
0
;
i
<
2
;
i
++
)
{
if
(
record
.
nextloc
[
i
]
>=
db
->
end
)
{
record
.
nextloc
[
i
]
=
0
;
r
=
rewrite_record
(
db
,
&
record
);
if
(
r
)
return
r
;
changed
++
;
}
}
num_records
++
;
/* find the next record */
nextoffset
=
_getloc
(
db
,
&
record
,
0
);
prevrecord
=
record
;
}
/* check for remaining offsets needing fixing */
for
(
i
=
2
;
i
<=
MAXLEVEL
;
i
++
)
{
if
(
next
[
i
])
{
/* need to fix up the previous record to point to the end */
r
=
read_onerecord
(
db
,
prev
[
i
],
&
fixrecord
);
if
(
r
)
return
r
;
/* XXX - optimise, same as above */
fixrecord
.
nextloc
[
i
]
=
0
;
r
=
rewrite_record
(
db
,
&
fixrecord
);
if
(
r
)
return
r
;
changed
++
;
}
}
r
=
mappedfile_truncate
(
db
->
mf
,
db
->
header
.
current_size
);
if
(
r
)
return
r
;
r
=
mappedfile_commit
(
db
->
mf
);
if
(
r
)
return
r
;
/* clear the dirty flag */
db
->
header
.
flags
&=
~
DIRTY
;
db
->
header
.
num_records
=
num_records
;
r
=
commit_header
(
db
);
if
(
r
)
return
r
;
if
(
count
)
*
count
=
changed
;
return
0
;
}
static
int
recovery
(
struct
dbengine
*
db
)
{
clock_t
start
=
sclock
();
int
count
=
0
;
int
r
;
/* no need to run recovery if we're consistent */
if
(
db_is_clean
(
db
))
return
0
;
r
=
recovery1
(
db
,
&
count
);
if
(
r
)
{
xsyslog
(
LOG_ERR
,
"DBERROR: recovery1 failed, trying recovery2"
,
"filename=<%s>"
,
FNAME
(
db
));
count
=
0
;
r
=
recovery2
(
db
,
&
count
);
if
(
r
)
return
r
;
}
{
syslog
(
LOG_INFO
,
"twoskip: recovered %s (%llu record%s, %llu bytes) in %2.3f seconds - fixed %d offset%s"
,
FNAME
(
db
),
(
LLU
)
db
->
header
.
num_records
,
db
->
header
.
num_records
==
1
?
""
:
"s"
,
(
LLU
)(
db
->
header
.
current_size
),
(
sclock
()
-
start
)
/
(
double
)
CLOCKS_PER_SEC
,
count
,
count
==
1
?
""
:
"s"
);
}
return
0
;
}
static
int
fetch
(
struct
dbengine
*
mydb
,
const
char
*
key
,
size_t
keylen
,
const
char
**
data
,
size_t
*
datalen
,
struct
txn
**
tidptr
)
{
assert
(
key
);
assert
(
keylen
);
return
myfetch
(
mydb
,
key
,
keylen
,
NULL
,
NULL
,
data
,
datalen
,
tidptr
,
0
);
}
static
int
fetchnext
(
struct
dbengine
*
mydb
,
const
char
*
key
,
size_t
keylen
,
const
char
**
foundkey
,
size_t
*
fklen
,
const
char
**
data
,
size_t
*
datalen
,
struct
txn
**
tidptr
)
{
return
myfetch
(
mydb
,
key
,
keylen
,
foundkey
,
fklen
,
data
,
datalen
,
tidptr
,
1
);
}
static
int
create
(
struct
dbengine
*
db
,
const
char
*
key
,
size_t
keylen
,
const
char
*
data
,
size_t
datalen
,
struct
txn
**
tid
)
{
if
(
datalen
)
assert
(
data
);
return
mystore
(
db
,
key
,
keylen
,
data
?
data
:
""
,
datalen
,
tid
,
0
);
}
static
int
store
(
struct
dbengine
*
db
,
const
char
*
key
,
size_t
keylen
,
const
char
*
data
,
size_t
datalen
,
struct
txn
**
tid
)
{
if
(
datalen
)
assert
(
data
);
return
mystore
(
db
,
key
,
keylen
,
data
?
data
:
""
,
datalen
,
tid
,
1
);
}
static
int
delete
(
struct
dbengine
*
db
,
const
char
*
key
,
size_t
keylen
,
struct
txn
**
tid
,
int
force
)
{
return
mystore
(
db
,
key
,
keylen
,
NULL
,
0
,
tid
,
force
);
}
/* twoskip compar function is set at open */
static
int
mycompar
(
struct
dbengine
*
db
,
const
char
*
a
,
int
alen
,
const
char
*
b
,
int
blen
)
{
return
db
->
compar
(
a
,
alen
,
b
,
blen
);
}
HIDDEN
struct
cyrusdb_backend
cyrusdb_twoskip
=
{
"twoskip"
,
/* name */
&
cyrusdb_generic_init
,
&
cyrusdb_generic_done
,
&
cyrusdb_generic_sync
,
&
cyrusdb_generic_archive
,
&
cyrusdb_generic_unlink
,
&
myopen
,
&
myclose
,
&
fetch
,
&
fetch
,
&
fetchnext
,
&
myforeach
,
&
create
,
&
store
,
&
delete
,
&
mycommit
,
&
myabort
,
&
dump
,
&
consistent
,
&
mycheckpoint
,
&
mycompar
};
File Metadata
Details
Attached
Mime Type
text/x-c
Expires
Fri, Apr 24, 1:01 PM (1 w, 2 d ago)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
3e/d9/f2c792363c98bdc38767f2fbf4d3
Default Alt Text
cyrusdb_twoskip.c (72 KB)
Attached To
Mode
R111 cyrus-imapd
Attached
Detach File
Event Timeline