Storage API¶
This page documents the public API of PGJsonbStorage and
PGJsonbStorageInstance, the two classes that implement ZODB storage.
PGJsonbStorage is the main storage (factory).
It manages schema initialization, OID allocation, shared state, and
creates per-connection instances.
PGJsonbStorageInstance is a per-connection MVCC instance created by
new_instance().
Each ZODB Connection receives its own instance with an independent
PostgreSQL connection for snapshot isolation.
PGJsonbStorage¶
from zodb_pgjsonb.storage import PGJsonbStorage
Declared interfaces: IPGJsonbStorage, IMVCCStorage, IBlobStorage,
IStorageUndoable, IStorageIteration, IStorageRestoreable.
Inherits from: ConflictResolvingStorage, BaseStorage.
Constructor¶
PGJsonbStorage(
dsn: str,
name: str = "pgjsonb",
history_preserving: bool = False,
blob_temp_dir: str | None = None,
cache_local_mb: int = 16,
pool_size: int = 1,
pool_max_size: int = 10,
pool_timeout: float = 30.0,
s3_client: S3Client | None = None,
blob_cache: S3BlobCache | None = None,
blob_threshold: int = 102_400,
)
See Configuration options for parameter descriptions.
The constructor connects to PostgreSQL, creates a connection pool, installs the schema, and restores max OID and last TID from existing data.
IMVCCStorage methods¶
new_instance() -> PGJsonbStorageInstanceCreate a per-connection storage instance. Each ZODB Connection gets its own instance with an independent PostgreSQL connection.
release() -> NoneRelease resources. No-op on the main storage.
poll_invalidations() -> list[bytes]Poll for invalidations. No-op on the main storage; returns
[].sync(force: bool = True) -> NoneSync snapshot. No-op on the main storage.
IStorage read methods¶
load(oid: bytes, version: str = "") -> tuple[bytes, bytes]Load the current object state. Returns
(pickle_bytes, tid_bytes). RaisesPOSKeyErrorif the object does not exist. Results are cached in the per-instance LRU cache.loadBefore(oid: bytes, tid: bytes) -> tuple[bytes, bytes, bytes] | NoneLoad object data before a given TID. Returns
(data, serial, end_serial)orNone. In history-preserving mode, queriesobject_history. In history-free mode, returns the current state if its TID is less than the requested TID.loadSerial(oid: bytes, serial: bytes) -> bytesLoad a specific revision of an object. Returns pickle bytes. Raises
POSKeyErrorif the revision does not exist. Checks the serial cache first (needed for conflict resolution in history-free mode).
IStorage write methods¶
store(oid: bytes, serial: bytes, data: bytes, version: str, transaction) -> NoneQueue an object for storage during the current transaction. Decodes pickle to JSON via
zodb-json-codec, runs state processors, and defers conflict detection to_vote().checkCurrentSerialInTransaction(oid: bytes, serial: bytes, transaction) -> NoneQueue a read-conflict check for batch verification in
_vote().
IStorage two-phase commit¶
tpc_begin(tid, u, d, e) -> NoneBegin a two-phase commit. Starts a PostgreSQL transaction and acquires advisory lock 0.
tpc_vote(transaction) -> list | NoneFlush pending stores and blobs to PostgreSQL. Performs batch conflict detection, writes transaction log, objects, and blobs. Calls
finalize()on each registered state processor. Returns a list of resolved conflict OIDs, orNone.tpc_finish(transaction, f: callable | None = None) -> bytesCommit the PostgreSQL transaction and run the optional callback. Returns the transaction’s TID bytes.
tpc_abort(transaction) -> NoneRollback the PostgreSQL transaction and clean up temp blob files.
IStorage metadata methods¶
lastTransaction() -> bytesReturn the TID of the last committed transaction.
__len__() -> intReturn the approximate number of objects in
object_state.getSize() -> intReturn the approximate total size in bytes (sum of
state_size).history(oid: bytes, size: int = 1) -> list[dict]Return revision history for an object. Each dict contains:
time,tid,serial,user_name,description,size. In history-preserving mode, queries bothobject_stateandobject_history.pack(t: float, referencesf) -> NonePack the storage by removing unreachable objects and their blobs. In history-preserving mode, removes old revisions before the pack time. Cleans up S3 blobs for deleted objects.
close() -> NoneClose all database connections, the connection pool, and remove the blob temp directory.
IStorageUndoable methods¶
supportsUndo() -> boolReturns
Truein history-preserving mode,Falseotherwise.undoLog(first: int = 0, last: int = -20, filter: callable | None = None) -> list[dict]Return a list of transaction descriptions for undo. Each dict contains:
id(TID bytes),time,user_name(bytes),description(bytes), plus any extension metadata. Returns[]in history-free mode. Excludes transactions at or before the last pack time.undo(transaction_id: bytes, transaction=None) -> tuple[bytes, list[bytes]]Undo a transaction by restoring previous object states. Returns
(tid, [oid_bytes, ...])for the new undo transaction. RaisesUndoErrorin history-free mode.
IStorageIteration methods¶
iterator(start: bytes | None = None, stop: bytes | None = None) -> Iterator[TransactionRecord]Iterate over transactions yielding
TransactionRecordobjects. Borrows a connection from the pool for the duration of iteration. In history-free mode, each object appears once at its current TID. In history-preserving mode, all revisions are included.
IStorageRestoreable methods¶
restore(oid: bytes, serial: bytes, data: bytes, version: str, prev_txn, transaction) -> NoneWrite pre-committed data without conflict checking. Used by
copyTransactionsFrom()andzodbconvert.restoreBlob(oid: bytes, serial: bytes, data: bytes, blobfilename: str, prev_txn, transaction) -> NoneRestore object data and a blob file without conflict checking.
copyTransactionsFrom(other, workers: int = 1) -> NoneCopy all transactions from another storage, including blobs. Blob files are copied (not moved) to preserve source storage integrity. When
workers> 1, uses parallel PostgreSQL connections for concurrent writes. The main thread reads from the source, decodes pickles, and tracks OID-level dependencies to guarantee correct write ordering.
IBlobStorage methods¶
storeBlob(oid: bytes, oldserial: bytes, data: bytes, blobfilename: str, version: str, transaction) -> NoneStore object data and a blob file. Calls
store()for the object data and stages the blob file to a stable temp location.loadBlob(oid: bytes, serial: bytes) -> strReturn the path to a file containing the blob data. Uses deterministic filenames (
{oid:016x}-{tid:016x}.blob) so repeated calls return the same path. Checks pending blobs, the local cache, the S3 blob cache, and then the database in that order.openCommittedBlobFile(oid: bytes, serial: bytes, blob=None) -> IOOpen a committed blob file for reading. Returns a file object or
BlobFile.temporaryDirectory() -> strReturn the directory path for uncommitted blob data.
Blob statistics methods¶
get_blob_stats() -> dictReturn blob storage statistics. The returned dict contains:
Key
Type
Description
availableboolWhether the
blob_statetable exists.total_blobsintTotal number of blob rows.
unique_objectsintNumber of distinct OIDs with blobs.
avg_versionsfloatAverage blob versions per object.
total_sizeintTotal blob size in bytes.
total_size_displaystrHuman-readable total size.
pg_countintNumber of PG-stored blobs.
pg_sizeintTotal size of PG-stored blobs in bytes.
pg_size_displaystrHuman-readable PG blob size.
s3_countintNumber of S3-stored blobs.
s3_sizeintTotal size of S3-stored blobs in bytes.
s3_size_displaystrHuman-readable S3 blob size.
largest_blobintSize of the largest blob in bytes.
largest_blob_displaystrHuman-readable largest blob size.
avg_blob_sizeintAverage blob size in bytes.
avg_blob_size_displaystrHuman-readable average blob size.
s3_configuredboolWhether an S3 client is configured.
blob_thresholdintCurrent blob threshold in bytes.
blob_threshold_displaystrHuman-readable blob threshold.
Returns
{"available": False}if theblob_statetable does not exist.get_blob_histogram() -> list[dict]Return blob size distribution as logarithmic buckets. Each dict in the list contains:
Key
Type
Description
labelstrBucket range label (for example,
"100.0 KB -- 1.0 MB").countintNumber of blobs in this bucket.
pctintPercentage relative to the largest bucket (0–100).
tierstr"pg","s3","mixed", or""(no S3 configured).Returns
[]if no blobs exist.
History mode conversion methods¶
convert_to_history_free() -> dictConvert a history-preserving database to history-free mode. Drops
object_history,pack_state, and the deprecatedblob_historytable. Removes old blob versions fromblob_state(keeps only the latest tid per zoid) and orphanedtransaction_logentries. Returns a dict with counts:history_rows,pack_rows,blob_history_rows,old_blob_versions,orphan_transactions. RaisesRuntimeErrorif no history tables exist.convert_to_history_preserving() -> NoneConvert a history-free database to history-preserving mode. Creates
object_historyandpack_statetables. Existing objects gain history tracking on their next modification. RaisesRuntimeErrorif history tables already exist.compact_history() -> tuple[int, int]Remove duplicate entries created by the old dual-write mode. Returns
(deleted_object_history_rows, deleted_blob_history_rows). Returns(0, 0)in history-free mode.
State processor registration¶
register_state_processor(processor) -> NoneRegister a state processor plugin. See State processor API for the processor protocol.
PGJsonbStorageInstance¶
from zodb_pgjsonb.storage import PGJsonbStorageInstance
Declared interfaces: IBlobStorage.
Inherits from: ConflictResolvingStorage.
Properties¶
pg_connection -> psycopg.ConnectionThe underlying psycopg connection (read-only). Shares the same
REPEATABLE READsnapshot used for ZODB loads, so external queries see a consistent point-in-time view.
IMVCCStorage methods¶
new_instance() -> PGJsonbStorageInstanceDelegate to the main storage’s
new_instance().release() -> NoneReturn the connection to the pool, end any active read transaction, and remove the blob temp directory.
poll_invalidations() -> list[bytes]Return OIDs changed since the last poll. Ends any previous read snapshot, starts a new
REPEATABLE READsnapshot, queries for changes, and invalidates affected cache entries. Returns[]if nothing changed.sync(force: bool = True) -> NoneSync snapshot. No-op (autocommit mode provides latest committed data).
IStorage read methods¶
load(oid: bytes, version: str = "") -> tuple[bytes, bytes]Load the current object state. Returns
(pickle_bytes, tid_bytes). Results are cached in the per-instance LRU cache.loadBefore(oid: bytes, tid: bytes) -> tuple[bytes, bytes, bytes] | NoneLoad object data before a given TID.
loadSerial(oid: bytes, serial: bytes) -> bytesLoad a specific revision of an object.
IStorage write methods¶
new_oid() -> bytesDelegate OID allocation to the main storage.
store(oid: bytes, serial: bytes, data: bytes, version: str, transaction) -> NoneQueue an object for storage. Conflict detection is deferred to
tpc_vote().checkCurrentSerialInTransaction(oid: bytes, serial: bytes, transaction) -> NoneQueue a read-conflict check for batch verification in
tpc_vote().
Two-phase commit¶
tpc_begin(transaction, tid: bytes | None = None, status: str = " ") -> NoneBegin a two-phase commit. Ends any active read snapshot, applies deferred DDL, starts a PostgreSQL transaction, acquires advisory lock 0, and generates a TID.
tpc_vote(transaction) -> list | NoneFlush pending stores and blobs to PostgreSQL. Performs batch conflict detection, writes all data, and calls
finalize()on each state processor.tpc_finish(transaction, f: callable | None = None) -> bytesCommit the PostgreSQL transaction. Updates the main storage’s last TID and runs the optional callback. Returns TID bytes.
tpc_abort(transaction) -> NoneRollback the PostgreSQL transaction and clean up temp blob files.
IBlobStorage methods¶
storeBlob(oid: bytes, oldserial: bytes, data: bytes, blobfilename: str, version: str, transaction) -> NoneStore object data and a blob file.
loadBlob(oid: bytes, serial: bytes) -> strReturn the path to a file containing the blob data.
openCommittedBlobFile(oid: bytes, serial: bytes, blob=None) -> IOOpen a committed blob file for reading.
temporaryDirectory() -> strReturn the directory path for uncommitted blob data.
IStorageRestoreable methods¶
restore(oid: bytes, serial: bytes, data: bytes, version: str, prev_txn, transaction) -> NoneWrite pre-committed data without conflict checking.
restoreBlob(oid: bytes, serial: bytes, data: bytes, blobfilename: str, prev_txn, transaction) -> NoneRestore object data and a blob file without conflict checking.
Metadata (delegated to main storage)¶
sortKey() -> strDelegate to main storage.
getName() -> strDelegate to main storage.
__name__ -> strProperty; delegates to main storage.
isReadOnly() -> boolReturns
False.lastTransaction() -> bytesReturn the main storage’s last TID.
__len__() -> intDelegate to main storage.
getSize() -> intDelegate to main storage.
history(oid: bytes, size: int = 1) -> list[dict]Delegate to main storage.
pack(t: float, referencesf) -> NoneDelegate to main storage.
supportsUndo() -> boolDelegate to main storage.
undoLog(first: int = 0, last: int = -20, filter: callable | None = None) -> list[dict]Delegate to main storage.
undoInfo(first: int = 0, last: int = -20, specification: dict | None = None) -> list[dict]Delegate to main storage.
undo(transaction_id: bytes, transaction=None) -> tuple[bytes, list[bytes]]Undo a transaction using this instance’s connection.
registerDB(db) -> NoneNo-op.
close() -> NoneCalls
release().
LoadCache¶
from zodb_pgjsonb.storage import LoadCache
Bounded LRU cache for load() results.
Stores (pickle_bytes, tid_bytes) keyed by zoid (int).
Each PGJsonbStorageInstance has its own cache; thread safety is not required.
Constructor¶
LoadCache(max_mb: int = 16)
Methods¶
get(zoid: int) -> tuple[bytes, bytes] | NoneLook up by zoid. Returns
(data, tid)orNone. Promotes the entry on hit.set(zoid: int, data: bytes, tid: bytes) -> NoneStore
(data, tid)for a zoid. Evicts LRU entries if the cache exceeds the byte budget.invalidate(zoid: int) -> NoneRemove a single zoid from the cache.
clear() -> NoneRemove all entries.
Properties¶
size_mb -> floatCurrent cache size in megabytes.
Attributes¶
hits: intNumber of cache hits.
misses: intNumber of cache misses.
Packer¶
from zodb_pgjsonb.packer import pack
pack(conn, pack_time: bytes | None = None, history_preserving: bool = False) -> tuple[int, int, list[str]]Remove unreachable objects and their blobs. Uses a recursive CTE to find all objects reachable from the root (zoid 0). Returns
(deleted_objects, deleted_blobs, s3_keys_to_delete). The caller is responsible for deleting S3 objects in the returned key list.In history-preserving mode with
pack_time, objects created or modified afterpack_timeare preserved even if currently unreachable. Old revisions inobject_historyandblob_statebeforepack_timeare removed for reachable objects. Orphanedtransaction_logentries at or beforepack_timeare deleted.
ZMI integration¶
The zodb_pgjsonb.zmi module patches App.ApplicationManager.AltDatabaseManager
to add a “Blob Storage” tab in the Zope Management Interface when the database
uses PGJsonbStorage.
The patch is applied automatically during ZConfig factory initialization when
Zope (App package) is available.
The tab displays blob statistics from get_blob_stats() and
get_blob_histogram().
Access requires the Manager role.