Storage API¶
This page documents the public API of PGJsonbStorage and
PGJsonbStorageInstance, the two classes that implement ZODB storage.
PGJsonbStorage is the main storage (factory).
It manages schema initialization, OID allocation, shared state, and
creates per-connection instances.
PGJsonbStorageInstance is a per-connection MVCC instance created by
new_instance().
Each ZODB Connection receives its own instance with an independent
PostgreSQL connection for snapshot isolation.
PGJsonbStorage¶
from zodb_pgjsonb.storage import PGJsonbStorage
Declared interfaces: IPGJsonbStorage, IMVCCStorage, IBlobStorage,
IStorageUndoable, IStorageIteration, IStorageRestoreable.
Inherits from: ConflictResolvingStorage, BaseStorage.
Constructor¶
PGJsonbStorage(
dsn: str,
name: str = "pgjsonb",
history_preserving: bool = False,
blob_temp_dir: str | None = None,
cache_local_mb: int = 16,
pool_size: int = 1,
pool_max_size: int = 10,
pool_timeout: float = 30.0,
s3_client: S3Client | None = None,
blob_cache: S3BlobCache | None = None,
blob_threshold: int = 102_400,
)
See Configuration options for parameter descriptions.
The constructor connects to PostgreSQL, creates a connection pool, installs the schema, and restores max OID and last TID from existing data.
IMVCCStorage methods¶
new_instance() -> PGJsonbStorageInstanceCreate a per-connection storage instance. Each ZODB Connection gets its own instance with an independent PostgreSQL connection.
release() -> NoneRelease resources. No-op on the main storage.
poll_invalidations() -> list[bytes]Poll for invalidations. No-op on the main storage; returns
[].sync(force: bool = True) -> NoneSync snapshot. No-op on the main storage.
IStorage read methods¶
load(oid: bytes, version: str = "") -> tuple[bytes, bytes]Load the current object state. Returns
(pickle_bytes, tid_bytes). RaisesPOSKeyErrorif the object does not exist. Results are cached in the per-instance LRU cache.loadBefore(oid: bytes, tid: bytes) -> tuple[bytes, bytes, bytes] | NoneLoad object data before a given TID. Returns
(data, serial, end_serial)orNone. In history-preserving mode, queriesobject_history. In history-free mode, returns the current state if its TID is less than the requested TID.loadSerial(oid: bytes, serial: bytes) -> bytesLoad a specific revision of an object. Returns pickle bytes. Raises
POSKeyErrorif the revision does not exist. Checks the serial cache first (needed for conflict resolution in history-free mode).
IStorage write methods¶
store(oid: bytes, serial: bytes, data: bytes, version: str, transaction) -> NoneQueue an object for storage during the current transaction. Decodes pickle to JSON via
zodb-json-codec, runs state processors, and defers conflict detection to_vote().checkCurrentSerialInTransaction(oid: bytes, serial: bytes, transaction) -> NoneQueue a read-conflict check for batch verification in
_vote().
IStorage two-phase commit¶
tpc_begin(tid, u, d, e) -> NoneBegin a two-phase commit. Starts a PostgreSQL transaction and acquires advisory lock 0.
tpc_vote(transaction) -> list | NoneFlush pending stores and blobs to PostgreSQL. Performs batch conflict detection, writes transaction log, objects, and blobs. Calls
finalize()on each registered state processor. Returns a list of resolved conflict OIDs, orNone.tpc_finish(transaction, f: callable | None = None) -> bytesCommit the PostgreSQL transaction and run the optional callback. Returns the transaction’s TID bytes.
tpc_abort(transaction) -> NoneRollback the PostgreSQL transaction and clean up temp blob files.
IStorage metadata methods¶
lastTransaction() -> bytesReturn the TID of the last committed transaction.
__len__() -> intReturn the approximate number of objects in
object_state.getSize() -> intReturn the approximate total size in bytes (sum of
state_size).history(oid: bytes, size: int = 1) -> list[dict]Return revision history for an object. Each dict contains:
time,tid,serial,user_name,description,size. In history-preserving mode, queries bothobject_stateandobject_history.pack(t: float, referencesf) -> NonePack the storage by removing unreachable objects and their blobs. In history-preserving mode, removes old revisions before the pack time. Cleans up S3 blobs for deleted objects.
close() -> NoneClose all database connections, the connection pool, and remove the blob temp directory.
IStorageUndoable methods¶
supportsUndo() -> boolReturns
Truein history-preserving mode,Falseotherwise.undoLog(first: int = 0, last: int = -20, filter: callable | None = None) -> list[dict]Return a list of transaction descriptions for undo. Each dict contains:
id(TID bytes),time,user_name(bytes),description(bytes), plus any extension metadata. Returns[]in history-free mode. Excludes transactions at or before the last pack time.undo(transaction_id: bytes, transaction=None) -> tuple[bytes, list[bytes]]Undo a transaction by restoring previous object states. Returns
(tid, [oid_bytes, ...])for the new undo transaction. RaisesUndoErrorin history-free mode.
IStorageIteration methods¶
iterator(start: bytes | None = None, stop: bytes | None = None) -> Iterator[TransactionRecord]Iterate over transactions yielding
TransactionRecordobjects. Borrows a connection from the pool for the duration of iteration. In history-free mode, each object appears once at its current TID. In history-preserving mode, all revisions are included.
IStorageRestoreable methods¶
restore(oid: bytes, serial: bytes, data: bytes, version: str, prev_txn, transaction) -> NoneWrite pre-committed data without conflict checking. Used by
copyTransactionsFrom()andzodbconvert.restoreBlob(oid: bytes, serial: bytes, data: bytes, blobfilename: str, prev_txn, transaction) -> NoneRestore object data and a blob file without conflict checking.
copyTransactionsFrom(other, workers=1, start_tid=None, blob_mode="inline")Copy all transactions from another storage.
Parameters:
other— source storage (must implementIStorageIteration)workers— number of parallel writer threads (default: 1 = sequential)start_tid— resume from this TID (bytes, optional)blob_mode— S3 blob upload strategy:"inline"(default) — upload synchronously in worker threads"background"— upload via background thread pool (PG writes continue)"deferred:/path/to/manifest.tsv"— write manifest, upload later
Blob files are copied (not moved) to preserve source storage integrity. When
workers> 1, uses parallel PostgreSQL connections for concurrent writes. The main thread reads from the source, decodes pickles, and tracks OID-level dependencies to guarantee correct write ordering. Whenstart_tidis set, iteration begins at that TID (for incremental imports). Amigration_watermarktable tracks contiguous commit progress during parallel copies so interrupted imports can resume safely without losing transactions.
IBlobStorage methods¶
storeBlob(oid: bytes, oldserial: bytes, data: bytes, blobfilename: str, version: str, transaction) -> NoneStore object data and a blob file. Calls
store()for the object data and stages the blob file to a stable temp location.loadBlob(oid: bytes, serial: bytes) -> strReturn the path to a file containing the blob data. Uses deterministic filenames (
{oid:016x}-{tid:016x}.blob) so repeated calls return the same path. Checks pending blobs, the local cache, the S3 blob cache, and then the database in that order.openCommittedBlobFile(oid: bytes, serial: bytes, blob=None) -> IOOpen a committed blob file for reading. Returns a file object or
BlobFile.temporaryDirectory() -> strReturn the directory path for uncommitted blob data.
Blob statistics methods¶
get_blob_stats() -> dictReturn blob storage statistics. The returned dict contains:
Key
Type
Description
availableboolWhether the
blob_statetable exists.total_blobsintTotal number of blob rows.
unique_objectsintNumber of distinct OIDs with blobs.
avg_versionsfloatAverage blob versions per object.
total_sizeintTotal blob size in bytes.
total_size_displaystrHuman-readable total size.
pg_countintNumber of PG-stored blobs.
pg_sizeintTotal size of PG-stored blobs in bytes.
pg_size_displaystrHuman-readable PG blob size.
s3_countintNumber of S3-stored blobs.
s3_sizeintTotal size of S3-stored blobs in bytes.
s3_size_displaystrHuman-readable S3 blob size.
largest_blobintSize of the largest blob in bytes.
largest_blob_displaystrHuman-readable largest blob size.
avg_blob_sizeintAverage blob size in bytes.
avg_blob_size_displaystrHuman-readable average blob size.
s3_configuredboolWhether an S3 client is configured.
blob_thresholdintCurrent blob threshold in bytes.
blob_threshold_displaystrHuman-readable blob threshold.
Returns
{"available": False}if theblob_statetable does not exist.get_blob_histogram() -> list[dict]Return blob size distribution as logarithmic buckets. Each dict in the list contains:
Key
Type
Description
labelstrBucket range label (for example,
"100.0 KB -- 1.0 MB").countintNumber of blobs in this bucket.
pctintPercentage relative to the largest bucket (0–100).
tierstr"pg","s3","mixed", or""(no S3 configured).Returns
[]if no blobs exist.
History mode conversion methods¶
convert_to_history_free() -> dictConvert a history-preserving database to history-free mode. Drops
object_history,pack_state, and the deprecatedblob_historytable. Removes old blob versions fromblob_state(keeps only the latest tid per zoid) and orphanedtransaction_logentries. Returns a dict with counts:history_rows,pack_rows,blob_history_rows,old_blob_versions,orphan_transactions. RaisesRuntimeErrorif no history tables exist.convert_to_history_preserving() -> NoneConvert a history-free database to history-preserving mode. Creates
object_historyandpack_statetables. Existing objects gain history tracking on their next modification. RaisesRuntimeErrorif history tables already exist.compact_history() -> tuple[int, int]Remove duplicate entries created by the old dual-write mode. Returns
(deleted_object_history_rows, deleted_blob_history_rows). Returns(0, 0)in history-free mode.
Prefetch registration¶
register_prefetch_refs_expr(sql_expr: str | None) -> NoneRegister a SQL expression for automatic refs prefetching on
load(). Added in v1.9.2.When set,
load()includes the expression as an extra column aliasedrefsin its SELECT query. If the expression evaluates to a non-NULL integer array, the referenced objects are prefetched into the load cache viaload_multiple(). This turns N+1 individual loads into 1+1 batch loads when traversing object trees.Call with
Noneto disable prefetching (the default).The expression is arbitrary SQL evaluated per row, so it can filter which objects trigger prefetch. For example, plone-pgcatalog registers an expression that only prefetches for cataloged content objects:
storage.register_prefetch_refs_expr( "CASE WHEN idx IS NOT NULL THEN refs END" )
Non-content objects (BTrees, PersistentMappings, etc.) have
idx IS NULLand produce a NULL result, suppressing prefetch. This avoids the cascade problem where internal ZODB structures reference thousands of objects that are never accessed.
State processor registration¶
register_state_processor(processor) -> NoneRegister a state processor plugin. See State processor API for the processor protocol.
Deferred startup actions¶
defer_startup_action(action: callable, name: str) -> NoneDefer a callable to the first write transaction. Added in v1.10.3.
At Zope startup, a ZODB Connection holds an open
REPEATABLE READsnapshot withACCESS SHAREonobject_state. DDL statements (ALTER TABLE,CREATE INDEX) needACCESS EXCLUSIVE, which would deadlock against that snapshot.This method queues a callable that will be executed during the first
tpc_begin()call, when the read snapshot has been committed and locks are released. The callable receives the DSN string as its only argument and should open its own autocommit connection.def create_my_index(dsn): import psycopg with psycopg.connect(dsn, autocommit=True) as conn: conn.execute("SET lock_timeout = '30s'") conn.execute( "CREATE INDEX IF NOT EXISTS idx_my_field " "ON object_state ((idx->>'my_field')) " "WHERE idx IS NOT NULL" ) storage.defer_startup_action(create_my_index, "my_plugin_indexes")
State processor DDL (from
get_schema_sql()) is automatically deferred — plugins only need this method for additional DDL that is not part of the processor’s schema.
PGJsonbStorageInstance¶
from zodb_pgjsonb.storage import PGJsonbStorageInstance
Declared interfaces: IBlobStorage.
Inherits from: ConflictResolvingStorage.
Properties¶
pg_connection -> psycopg.ConnectionThe underlying psycopg connection (read-only). Shares the same
REPEATABLE READsnapshot used for ZODB loads, so external queries see a consistent point-in-time view.
IMVCCStorage methods¶
new_instance() -> PGJsonbStorageInstanceDelegate to the main storage’s
new_instance().release() -> NoneReturn the connection to the pool, end any active read transaction, and remove the blob temp directory.
poll_invalidations() -> list[bytes]Return OIDs changed since the last poll. Ends any previous read snapshot, starts a new
REPEATABLE READsnapshot, queries for changes, and invalidates affected cache entries. Returns[]if nothing changed.sync(force: bool = True) -> NoneSync snapshot. No-op (autocommit mode provides latest committed data).
IStorage read methods¶
load(oid: bytes, version: str = "") -> tuple[bytes, bytes]Load the current object state. Returns
(pickle_bytes, tid_bytes). Results are cached in the per-instance LRU cache. When a prefetch refs expression is registered (seeregister_prefetch_refs_expr()), load also fetches therefscolumn and prefetches all referenced objects viaload_multiple().load_multiple(oids: Iterable[bytes]) -> dict[bytes, tuple[bytes, bytes]]Load multiple objects in a single SQL query. Added in v1.8.0.
Returns a dict mapping
oid_bytes -> (pickle_bytes, tid_bytes). Only includes OIDs that exist; missing OIDs are silently omitted. Checks the per-instance LRU cache first and only queries the database for cache misses using a singleSELECT ... WHERE zoid = ANY(...)statement. All results are cached.This method is the foundation for batch object loading. plone-pgcatalog uses it to prefetch the next N objects when
brain.getObject()is called on a search result, reducing per-object SQL roundtrips from N individual queries to 1 batch query.loadBefore(oid: bytes, tid: bytes) -> tuple[bytes, bytes, bytes] | NoneLoad object data before a given TID.
loadSerial(oid: bytes, serial: bytes) -> bytesLoad a specific revision of an object.
IStorage write methods¶
new_oid() -> bytesDelegate OID allocation to the main storage.
store(oid: bytes, serial: bytes, data: bytes, version: str, transaction) -> NoneQueue an object for storage. Conflict detection is deferred to
tpc_vote().checkCurrentSerialInTransaction(oid: bytes, serial: bytes, transaction) -> NoneQueue a read-conflict check for batch verification in
tpc_vote().
Two-phase commit¶
tpc_begin(transaction, tid: bytes | None = None, status: str = " ") -> NoneBegin a two-phase commit. Ends any active read snapshot, applies deferred DDL, starts a PostgreSQL transaction, acquires advisory lock 0, and generates a TID.
tpc_vote(transaction) -> list | NoneFlush pending stores and blobs to PostgreSQL. Performs batch conflict detection, writes all data, and calls
finalize()on each state processor.tpc_finish(transaction, f: callable | None = None) -> bytesCommit the PostgreSQL transaction. Updates the main storage’s last TID and runs the optional callback. Returns TID bytes.
tpc_abort(transaction) -> NoneRollback the PostgreSQL transaction and clean up temp blob files.
IBlobStorage methods¶
storeBlob(oid: bytes, oldserial: bytes, data: bytes, blobfilename: str, version: str, transaction) -> NoneStore object data and a blob file.
loadBlob(oid: bytes, serial: bytes) -> strReturn the path to a file containing the blob data.
openCommittedBlobFile(oid: bytes, serial: bytes, blob=None) -> IOOpen a committed blob file for reading.
temporaryDirectory() -> strReturn the directory path for uncommitted blob data.
IStorageRestoreable methods¶
restore(oid: bytes, serial: bytes, data: bytes, version: str, prev_txn, transaction) -> NoneWrite pre-committed data without conflict checking.
restoreBlob(oid: bytes, serial: bytes, data: bytes, blobfilename: str, prev_txn, transaction) -> NoneRestore object data and a blob file without conflict checking.
Metadata (delegated to main storage)¶
sortKey() -> strDelegate to main storage.
getName() -> strDelegate to main storage.
__name__ -> strProperty; delegates to main storage.
isReadOnly() -> boolReturns
False.lastTransaction() -> bytesReturn the main storage’s last TID.
__len__() -> intDelegate to main storage.
getSize() -> intDelegate to main storage.
history(oid: bytes, size: int = 1) -> list[dict]Delegate to main storage.
pack(t: float, referencesf) -> NoneDelegate to main storage.
supportsUndo() -> boolDelegate to main storage.
undoLog(first: int = 0, last: int = -20, filter: callable | None = None) -> list[dict]Delegate to main storage.
undoInfo(first: int = 0, last: int = -20, specification: dict | None = None) -> list[dict]Delegate to main storage.
undo(transaction_id: bytes, transaction=None) -> tuple[bytes, list[bytes]]Undo a transaction using this instance’s connection.
registerDB(db) -> NoneNo-op.
close() -> NoneCalls
release().
LoadCache¶
from zodb_pgjsonb.storage import LoadCache
Bounded LRU cache for load() results.
Stores (pickle_bytes, tid_bytes) keyed by zoid (int).
Each PGJsonbStorageInstance has its own cache; thread safety is not required.
Constructor¶
LoadCache(max_mb: int = 16)
Methods¶
get(zoid: int) -> tuple[bytes, bytes] | NoneLook up by zoid. Returns
(data, tid)orNone. Promotes the entry on hit.set(zoid: int, data: bytes, tid: bytes) -> NoneStore
(data, tid)for a zoid. Evicts LRU entries if the cache exceeds the byte budget.invalidate(zoid: int) -> NoneRemove a single zoid from the cache.
clear() -> NoneRemove all entries.
Properties¶
size_mb -> floatCurrent cache size in megabytes.
Attributes¶
hits: intNumber of cache hits.
misses: intNumber of cache misses.
Packer¶
from zodb_pgjsonb.packer import pack
pack(conn, pack_time: bytes | None = None, history_preserving: bool = False) -> tuple[int, int, list[str]]Remove unreachable objects and their blobs. Uses a recursive CTE to find all objects reachable from the root (zoid 0). Returns
(deleted_objects, deleted_blobs, s3_keys_to_delete). The caller is responsible for deleting S3 objects in the returned key list.In history-preserving mode with
pack_time, objects created or modified afterpack_timeare preserved even if currently unreachable. Old revisions inobject_historyandblob_statebeforepack_timeare removed for reachable objects. Orphanedtransaction_logentries at or beforepack_timeare deleted.
ZMI integration¶
The zodb_pgjsonb.zmi module patches App.ApplicationManager.AltDatabaseManager
to add a “Blob Storage” tab in the Zope Management Interface when the database
uses PGJsonbStorage.
The patch is applied automatically during ZConfig factory initialization when
Zope (App package) is available.
The tab displays blob statistics from get_blob_stats() and
get_blob_histogram().
Access requires the Manager role.