Skip to content

disdantic

Package initialization and unified entry point for the disdantic library.

This library simplifies registry management, polymorphic serialization, dynamic schema generation, and automatic module discovery. It provides mixins and managers to register and retrieve subclasses dynamically, making it easier to construct polymorphic data structures without manual boilerplate.

The core architecture exposes base components including RegistryMixin and InfoMixin for behavior tracking, LazyLoader and LazyProxy for performance-focused import loading, along with configure_logger and package-level Settings for system initialization.

AutoImporterMixin

Provides recursive package directory module scanning with cache wiping.

This mixin enables classes to dynamically discover and import submodules in a configured package hierarchy. It is typically integrated into registry systems to trigger dynamic registration of Pydantic model subclasses at runtime.

Example

.. code-block:: python

from disdantic.importer import AutoImporterMixin

class MyRegistry(AutoImporterMixin):
    auto_package = "my_app.models"
    auto_ignore_modules = ["my_app.models.private_model"]

# Scan and load the modules
MyRegistry.auto_import_package_modules()
Source code in src/disdantic/importer.py
class AutoImporterMixin:
    """
    Provides recursive package directory module scanning with cache wiping.

    This mixin enables classes to dynamically discover and import submodules
    in a configured package hierarchy. It is typically integrated into registry
    systems to trigger dynamic registration of Pydantic model subclasses at runtime.

    Example:
        .. code-block:: python

            from disdantic.importer import AutoImporterMixin

            class MyRegistry(AutoImporterMixin):
                auto_package = "my_app.models"
                auto_ignore_modules = ["my_app.models.private_model"]

            # Scan and load the modules
            MyRegistry.auto_import_package_modules()
    """

    auto_package: ClassVar[str | Sequence[str] | None] = None
    """The target package or packages to scan for auto-importing."""

    auto_ignore_modules: ClassVar[Sequence[str] | None] = None
    """A list of submodule names or paths to ignore during package discovery."""

    _auto_imported_modules: ClassVar[set[str]] = set()

    @classmethod
    def auto_import_package_modules(cls) -> None:
        """
        Walks configured module layouts recursively and imports submodules.

        This method scans the configured packages, ignoring designated modules,
        and imports all discovered submodules to trigger dynamic registration.

        :raises ValueError: If the class variable 'auto_package' has not been
            properly configured and settings do not specify any auto_packages.
        :raises ImportError: If a target package name cannot be resolved.
        :returns: None.
        """
        packages = cls._resolve_auto_packages()
        if not packages:
            raise MissingPackagesError(
                f"The class variable 'auto_package' must be configured on "
                f"{cls.__name__} or 'auto_packages' configured in settings "
                f"to enable automated package discovery."
            )

        ignore_set = cls._resolve_ignore_set()

        for package_name in packages:
            try:
                package = importlib.import_module(package_name)  # nosemgrep
            except ModuleNotFoundError as err:
                raise ImportError(
                    f"Target auto_package root '{package_name}' could not be resolved."
                ) from err

            if not hasattr(package, "__path__"):
                continue

            for _, module_name, is_pkg in pkgutil.walk_packages(
                package.__path__,
                f"{package.__name__}.",
            ):
                if (
                    is_pkg
                    or module_name in ignore_set
                    or module_name in cls._auto_imported_modules
                ):
                    continue

                if module_name in sys.modules:
                    cls._auto_imported_modules.add(module_name)
                    continue

                importlib.import_module(module_name)  # nosemgrep
                cls._auto_imported_modules.add(module_name)

    @classmethod
    def reset_importer_cache(cls) -> None:
        """
        Purges cached system modules imported by this class.

        Clears the tracked imported submodules from both sys.modules and the
        internal tracking set to guarantee clean test executions.

        :returns: None.
        """
        for module_name in list(cls._auto_imported_modules):
            sys.modules.pop(module_name, None)
        cls._auto_imported_modules.clear()

    @classmethod
    def _resolve_auto_packages(cls) -> list[str]:
        # Resolves and deduplicates the list of target packages to scan.
        packages: list[str] = []
        if cls.auto_package:
            if isinstance(cls.auto_package, str):
                packages.append(cls.auto_package)
            else:
                packages.extend(cls.auto_package)

        settings = get_settings()
        if settings.auto_packages:
            packages.extend(settings.auto_packages)

        # Deduplicate while preserving order
        unique: list[str] = []
        for pkg in packages:
            if pkg not in unique:
                unique.append(pkg)
        return unique

    @classmethod
    def _resolve_ignore_set(cls) -> set[str]:
        # Resolves and merges package/settings ignore lists.
        ignore_set = set(cls.auto_ignore_modules or [])
        settings = get_settings()
        if settings.auto_ignore_modules:
            ignore_set.update(settings.auto_ignore_modules)
        return ignore_set

auto_ignore_modules = None class-attribute

A list of submodule names or paths to ignore during package discovery.

auto_package = None class-attribute

The target package or packages to scan for auto-importing.

auto_import_package_modules() classmethod

Walks configured module layouts recursively and imports submodules.

This method scans the configured packages, ignoring designated modules, and imports all discovered submodules to trigger dynamic registration.

:raises ValueError: If the class variable 'auto_package' has not been properly configured and settings do not specify any auto_packages. :raises ImportError: If a target package name cannot be resolved. :returns: None.

Source code in src/disdantic/importer.py
@classmethod
def auto_import_package_modules(cls) -> None:
    """
    Walks configured module layouts recursively and imports submodules.

    This method scans the configured packages, ignoring designated modules,
    and imports all discovered submodules to trigger dynamic registration.

    :raises ValueError: If the class variable 'auto_package' has not been
        properly configured and settings do not specify any auto_packages.
    :raises ImportError: If a target package name cannot be resolved.
    :returns: None.
    """
    packages = cls._resolve_auto_packages()
    if not packages:
        raise MissingPackagesError(
            f"The class variable 'auto_package' must be configured on "
            f"{cls.__name__} or 'auto_packages' configured in settings "
            f"to enable automated package discovery."
        )

    ignore_set = cls._resolve_ignore_set()

    for package_name in packages:
        try:
            package = importlib.import_module(package_name)  # nosemgrep
        except ModuleNotFoundError as err:
            raise ImportError(
                f"Target auto_package root '{package_name}' could not be resolved."
            ) from err

        if not hasattr(package, "__path__"):
            continue

        for _, module_name, is_pkg in pkgutil.walk_packages(
            package.__path__,
            f"{package.__name__}.",
        ):
            if (
                is_pkg
                or module_name in ignore_set
                or module_name in cls._auto_imported_modules
            ):
                continue

            if module_name in sys.modules:
                cls._auto_imported_modules.add(module_name)
                continue

            importlib.import_module(module_name)  # nosemgrep
            cls._auto_imported_modules.add(module_name)

reset_importer_cache() classmethod

Purges cached system modules imported by this class.

Clears the tracked imported submodules from both sys.modules and the internal tracking set to guarantee clean test executions.

:returns: None.

Source code in src/disdantic/importer.py
@classmethod
def reset_importer_cache(cls) -> None:
    """
    Purges cached system modules imported by this class.

    Clears the tracked imported submodules from both sys.modules and the
    internal tracking set to guarantee clean test executions.

    :returns: None.
    """
    for module_name in list(cls._auto_imported_modules):
        sys.modules.pop(module_name, None)
    cls._auto_imported_modules.clear()

DiagnosticsReport

Bases: BaseModel

Aggregated health status and discovery details of all checked registries.

This class serves as the root container returned by the verification pipeline. It consolidates individual registry diagnostics, scanned packages, and import failure traces into a single report.

Examples:

.. code-block:: python

from disdantic.diagnose import verify_registries

report = verify_registries()
if not report.is_healthy:
    print(f"Scanned packages: {report.scanned_packages}")
    print(f"Import errors: {report.import_errors}")
Source code in src/disdantic/diagnose.py
class DiagnosticsReport(BaseModel):
    """Aggregated health status and discovery details of all checked registries.

    This class serves as the root container returned by the verification pipeline. It
    consolidates individual registry diagnostics, scanned packages, and import failure
    traces into a single report.

    Examples:
        .. code-block:: python

            from disdantic.diagnose import verify_registries

            report = verify_registries()
            if not report.is_healthy:
                print(f"Scanned packages: {report.scanned_packages}")
                print(f"Import errors: {report.import_errors}")
    """

    is_healthy: bool = Field(
        description=(
            "Indicates whether the entire check is clean, enabling safe dynamic model "
            "resolution with zero registry or import failures."
        )
    )
    scanned_packages: list[str] = Field(
        description="Maps the list of Python package names traversed during discovery."
    )
    registries: list[RegistryDiagnostics] = Field(
        default_factory=list,
        description=(
            "Configures the collection of registry-specific diagnostics reports."
        ),
    )
    import_errors: list[str] = Field(
        default_factory=list,
        description=(
            "Maps the traceback strings for import errors encountered during scanning."
        ),
    )

InfoMixin

Mixin providing runtime self-introspection to generate object structures.

This mixin allows subclassing models to expose their public attributes, properties, slots, and instance dicts as sanitized primitives. It recursively inspects objects, resolves lazy loaders or proxies, and handles circular reference loops and property extraction errors without raising exceptions.

Source code in src/disdantic/introspection.py
class InfoMixin:
    """Mixin providing runtime self-introspection to generate object structures.

    This mixin allows subclassing models to expose their public attributes,
    properties, slots, and instance dicts as sanitized primitives. It recursively
    inspects objects, resolves lazy loaders or proxies, and handles circular reference
    loops and property extraction errors without raising exceptions.
    """

    @classmethod
    def extract_from_obj(
        cls, obj: Any, visited: set[int] | None = None
    ) -> dict[str, Any]:
        """Parse complex objects into sanitized primitive dictionaries.

        This method recursively crawls the object to extract public fields,
        evaluates custom `.info` hooks if defined, and translates collections
        or nested objects into JSON/YAML compatible dictionaries.
        """
        if visited is None:
            visited = set()

        if not isinstance(obj, type) and obj is not cls:
            try:
                info_class_attr = getattr(type(obj), "info", None)
                if not _is_default_info(info_class_attr) and (
                    info_class_attr is not None or hasattr(obj, "info")
                ):
                    info_val = obj.info
                    raw_info = info_val() if callable(info_val) else info_val
                    return dict(cls._sanitize(raw_info, visited))
            except Exception:  # noqa: BLE001, S110
                pass

        obj_class = getattr(obj, "__class__", type(obj))
        obj_id = id(obj)

        visited.add(obj_id)
        try:
            attributes = cls._extract_attributes(obj, visited)
        finally:
            visited.discard(obj_id)

        return {
            "str": cls._sanitize_fallback(obj),
            "type": obj_class.__name__,
            "module": obj_class.__module__,
            "attributes": attributes,
        }

    def __repr__(self) -> str:
        """__repr__ delegating to MRO overrides or fallback."""
        return self._delegate_dunder("__repr__")

    def __str__(self) -> str:
        """__str__ delegating to MRO overrides or fallback."""
        return self._delegate_dunder("__str__")

    @property
    def info(self) -> dict[str, Any]:
        """Self-introspection dictionary representing the public state."""
        return self.extract_from_obj(self)

    def info_json(
        self,
        *,
        indent: int | None = None,
        sort_keys: bool = False,
        **kwargs: Any,
    ) -> str:
        """Serialize the introspection info dictionary into a valid JSON string."""
        prepared = self._sanitize(self.info, set())
        return json.dumps(prepared, indent=indent, sort_keys=sort_keys, **kwargs)

    def info_yaml(
        self,
        *,
        indent: int | None = None,
        sort_keys: bool = False,
        **kwargs: Any,
    ) -> str:
        """Serialize the introspection info dictionary into a valid YAML string."""
        if yaml is None:
            raise ImportError(
                "PyYAML is required for YAML serialization. "
                "Install disdantic with the 'yaml' extra: pip install disdantic[yaml]"
            )
        prepared = self._sanitize(self.info, set())
        return yaml.dump(prepared, indent=indent, sort_keys=sort_keys, **kwargs)

    def _delegate_dunder(self, name: str) -> str:
        # Delegate repr/str dunder method to MRO overrides or fallback.
        for cls_item in self.__class__.__mro__:
            if cls_item.__name__ in (
                "InfoMixin",
                "object",
                "BaseModel",
                "ReloadableBaseModel",
            ):
                continue
            if name in cls_item.__dict__:
                return str(getattr(cls_item, name)(self))
        return f"<{self.__class__.__name__} info={self.info}>"

    @classmethod
    def _is_class_variable(cls, obj: Any, key: str) -> bool:
        # Check if the key is defined on class, not instance dict/slots
        if isinstance(obj, type):
            return False

        if hasattr(obj, "__dict__") and key in obj.__dict__:
            return False

        # Slots check across MRO
        for mro_cls in type(obj).__mro__:
            slots = getattr(mro_cls, "__slots__", None)
            if slots and key in slots:
                return False

        # Check if the key is defined on any class in the MRO.
        for mro_cls in type(obj).__mro__:
            if key in mro_cls.__dict__:
                class_attr = mro_cls.__dict__[key]
                # If it has a __get__ method (e.g. property, method, descriptor),
                # it's evaluated on the instance and represents instance state.
                return not hasattr(class_attr, "__get__")

        return False

    @classmethod
    def _has_info_protocol(cls, val: Any) -> bool:
        # Check if an object implements the info protocol without evaluating it.
        try:
            return getattr(type(val), "info", None) is not None or (
                hasattr(val, "__dict__") and "info" in val.__dict__
            )
        except Exception:  # noqa: BLE001
            return False

    @classmethod
    def _extract_attributes(cls, obj: Any, visited: set[int]) -> dict[str, Any]:
        # Scrapes attributes from instance spaces, slots, and properties safely.
        attributes: dict[str, Any] = {}

        exclude_keys = set(get_settings().info_exclude_keys)
        is_pydantic = isinstance(obj, BaseModel)
        is_registry = isinstance(obj, RegistryMixin)

        for key in dir(obj):
            if key.startswith("_") or key in exclude_keys:
                continue

            if is_pydantic and key in (
                "model_fields",
                "model_computed_fields",
                "model_config",
                "model_fields_set",
                "model_extra",
            ):
                continue

            if is_registry and key in (
                "registry",
                "registry_auto_discovery",
                "registry_populated",
                "schema_discriminator",
            ):
                continue

            if cls._is_class_variable(obj, key):
                continue

            try:
                val = getattr(obj, key)
                if isinstance(val, collections.abc.Callable):
                    continue

                attributes[key] = cls._sanitize(val, visited)
            except Exception as err:  # noqa: BLE001
                attributes[key] = f"<Extraction Error: {err!r}>"

        return attributes

    @classmethod
    def _sanitize(cls, val: Any, visited: set[int]) -> Any:
        # Unified recursive sanitization and preparation for serialization.
        if isinstance(val, LazyProxy):
            val = val._resolve()  # noqa: SLF001

        val_id = id(val)
        if val_id in visited:
            res = f"<CircularReference: ID {val_id}>"
        elif isinstance(val, PRIMITIVE_TYPES):
            res = val
        elif isinstance(val, dict):
            visited.add(val_id)
            try:
                res = {
                    str(item_key): cls._sanitize(item_val, visited)
                    for item_key, item_val in val.items()
                }
            finally:
                visited.discard(val_id)
        elif isinstance(val, list | tuple | set):
            visited.add(val_id)
            try:
                res = [cls._sanitize(item, visited) for item in val]
            finally:
                visited.discard(val_id)
        elif isinstance(val, type):
            try:
                res = repr(val)
            except Exception:  # noqa: BLE001
                res = f"<{val.__name__} object at {hex(id(val))}>"
        elif cls._has_info_protocol(val):
            res = cls._sanitize_custom(val, val_id, visited)
        else:
            res = cls._sanitize_fallback(val)

        return res

    @classmethod
    def _sanitize_custom(cls, val: Any, val_id: int, visited: set[int]) -> Any:
        # Sanitize a custom object that implements the info protocol.
        try:
            info_class_attr = getattr(type(val), "info", None)
            visited.add(val_id)
            try:
                if _is_default_info(info_class_attr):
                    return cls.extract_from_obj(val, visited)
                info_val = val.info
                raw_info = info_val() if callable(info_val) else info_val
                return cls._sanitize(raw_info, visited)
            finally:
                visited.discard(val_id)
        except Exception:  # noqa: BLE001
            return cls._sanitize_fallback(val)

    @classmethod
    def _sanitize_fallback(cls, val: Any) -> str:
        # Get a safe string representation of val, avoiding infinite loops.
        val_class = getattr(val, "__class__", type(val))
        fallback = f"<{val_class.__name__} object at {hex(id(val))}>"
        try:
            mro = val_class.__mro__
            if not any(mro_cls.__name__ == "InfoMixin" for mro_cls in mro):
                return str(val)
            for cls_item in mro:
                if (
                    cls_item.__name__
                    not in (
                        "InfoMixin",
                        "object",
                        "BaseModel",
                        "ReloadableBaseModel",
                    )
                    and "__str__" in cls_item.__dict__
                ):
                    return str(val)
            return fallback
        except Exception:  # noqa: BLE001
            return fallback

info property

Self-introspection dictionary representing the public state.

__repr__()

repr delegating to MRO overrides or fallback.

Source code in src/disdantic/introspection.py
def __repr__(self) -> str:
    """__repr__ delegating to MRO overrides or fallback."""
    return self._delegate_dunder("__repr__")

__str__()

str delegating to MRO overrides or fallback.

Source code in src/disdantic/introspection.py
def __str__(self) -> str:
    """__str__ delegating to MRO overrides or fallback."""
    return self._delegate_dunder("__str__")

extract_from_obj(obj, visited=None) classmethod

Parse complex objects into sanitized primitive dictionaries.

This method recursively crawls the object to extract public fields, evaluates custom .info hooks if defined, and translates collections or nested objects into JSON/YAML compatible dictionaries.

Source code in src/disdantic/introspection.py
@classmethod
def extract_from_obj(
    cls, obj: Any, visited: set[int] | None = None
) -> dict[str, Any]:
    """Parse complex objects into sanitized primitive dictionaries.

    This method recursively crawls the object to extract public fields,
    evaluates custom `.info` hooks if defined, and translates collections
    or nested objects into JSON/YAML compatible dictionaries.
    """
    if visited is None:
        visited = set()

    if not isinstance(obj, type) and obj is not cls:
        try:
            info_class_attr = getattr(type(obj), "info", None)
            if not _is_default_info(info_class_attr) and (
                info_class_attr is not None or hasattr(obj, "info")
            ):
                info_val = obj.info
                raw_info = info_val() if callable(info_val) else info_val
                return dict(cls._sanitize(raw_info, visited))
        except Exception:  # noqa: BLE001, S110
            pass

    obj_class = getattr(obj, "__class__", type(obj))
    obj_id = id(obj)

    visited.add(obj_id)
    try:
        attributes = cls._extract_attributes(obj, visited)
    finally:
        visited.discard(obj_id)

    return {
        "str": cls._sanitize_fallback(obj),
        "type": obj_class.__name__,
        "module": obj_class.__module__,
        "attributes": attributes,
    }

info_json(*, indent=None, sort_keys=False, **kwargs)

Serialize the introspection info dictionary into a valid JSON string.

Source code in src/disdantic/introspection.py
def info_json(
    self,
    *,
    indent: int | None = None,
    sort_keys: bool = False,
    **kwargs: Any,
) -> str:
    """Serialize the introspection info dictionary into a valid JSON string."""
    prepared = self._sanitize(self.info, set())
    return json.dumps(prepared, indent=indent, sort_keys=sort_keys, **kwargs)

info_yaml(*, indent=None, sort_keys=False, **kwargs)

Serialize the introspection info dictionary into a valid YAML string.

Source code in src/disdantic/introspection.py
def info_yaml(
    self,
    *,
    indent: int | None = None,
    sort_keys: bool = False,
    **kwargs: Any,
) -> str:
    """Serialize the introspection info dictionary into a valid YAML string."""
    if yaml is None:
        raise ImportError(
            "PyYAML is required for YAML serialization. "
            "Install disdantic with the 'yaml' extra: pip install disdantic[yaml]"
        )
    prepared = self._sanitize(self.info, set())
    return yaml.dump(prepared, indent=indent, sort_keys=sort_keys, **kwargs)

LazyLoader

Provides thread-safe lazy loading decorators for modules, classes, and variables.

Encapsulates static utilities to defer package imports and object instantiation. It supports lazy module resolution, injecting descriptors into classes, and wrapping functional closures inside proxy containers.

Example

.. code-block:: python

from disdantic.loading import LazyLoader

# Lazy-load a module namespace
lazy_sys = LazyLoader.load_module_proxy("sys")
Source code in src/disdantic/loading.py
class LazyLoader:
    """Provides thread-safe lazy loading decorators for modules, classes, and variables.

    Encapsulates static utilities to defer package imports and object instantiation.
    It supports lazy module resolution, injecting descriptors into classes, and
    wrapping functional closures inside proxy containers.

    Example:
        .. code-block:: python

            from disdantic.loading import LazyLoader

            # Lazy-load a module namespace
            lazy_sys = LazyLoader.load_module_proxy("sys")
    """

    _global_lock = threading.Lock()

    @classmethod
    def module(cls, module_name: str) -> Callable[[types.ModuleType], types.ModuleType]:
        """Create a decorator to lazy-load module subpackages on demand.

        Example:
            .. code-block:: python

                import sys
                from disdantic.loading import LazyLoader

                @LazyLoader.module("disdantic")
                def loading(mod):
                    pass

        :param module_name: The fully qualified name of the package or module.
        :returns: A decorator function that replaces the target module with
            a lazy module proxy.
        """

        def decorator(_: types.ModuleType) -> types.ModuleType:
            class LazyModule(types.ModuleType):
                def __getattr__(self, name: str) -> Any:
                    with cls._global_lock:
                        try:
                            target_path = f"{module_name}.{name}"
                            return importlib.import_module(target_path)  # nosemgrep
                        except ImportError as error:
                            raise AttributeError(
                                f"Module '{module_name}' has no attribute '{name}'"
                            ) from error

            lazy_mod = LazyModule(module_name)
            sys.modules[module_name] = lazy_mod
            return lazy_mod

        return decorator

    @classmethod
    def class_attributes(
        cls, mapping: dict[str, str | Callable[[], Any]]
    ) -> Callable[[type[TargetT]], type[TargetT]]:
        """Bind lazy property descriptors to class attributes to defer instantiation.

        Accepts a dictionary mapping attribute names to either importable module paths
        or factory callables, and returning a decorator for the target class.

        Example:
            .. code-block:: python

                from disdantic.loading import LazyLoader

                @LazyLoader.class_attributes({"sys": "sys"})
                class MyClass:
                    pass

                instance = MyClass()
                # Accessing sys will load the module lazily
                print(instance.sys.path)

        :param mapping: Dictionary mapping attribute names to module paths or callables.
        :returns: A decorator function that updates the class with lazy attributes.
        """

        def decorator(target_cls: type[TargetT]) -> type[TargetT]:
            for attr_name, target in mapping.items():

                def _create_getter(
                    target_val: str | Callable[[], Any] = target,
                ) -> property:
                    proxy = LazyProxy(
                        lambda: (
                            importlib.import_module(target_val)  # nosemgrep
                            if isinstance(target_val, str)
                            else target_val()
                        )
                    )
                    return property(lambda _: proxy._resolve())  # noqa: SLF001

                setattr(target_cls, attr_name, _create_getter())
            return target_cls

        return decorator

    @classmethod
    def definition(cls, factory: Callable[[], Any]) -> LazyProxy:
        """Create an explicit variable proxy from a functional factory closure.

        Defers execution of the factory callable until attributes are accessed
        on the returned proxy.

        Example:
            .. code-block:: python

                from disdantic.loading import LazyLoader

                proxy = LazyLoader.definition(lambda: [1, 2, 3])
                # Factory remains uncalled until accessed
                print(len(proxy))

        :param factory: A zero-argument callable returning the target object.
        :returns: A LazyProxy instance wrapping the factory function.
        """
        return LazyProxy(factory)

    @classmethod
    def load_module_proxy(cls, fullname: str) -> types.ModuleType:
        """Generate a standard lazy loader proxy directly within the module mapping.

        Resolves the module spec and registers a lazy-loading module in sys.modules,
        only executing the module when its attributes are accessed.

        Example:
            .. code-block:: python

                from disdantic.loading import LazyLoader

                # Loads my_module lazily
                my_mod = LazyLoader.load_module_proxy("my_module")

        :param fullname: The fully qualified name of the module to lazy-load.
        :returns: A module proxy that triggers execution of the module on access.
        :raises ModuleNotFoundError: If the module specified by fullname cannot
            be resolved.
        """
        with cls._global_lock:
            if fullname in sys.modules:
                return sys.modules[fullname]

            spec = importlib.util.find_spec(fullname)
            if not spec or not spec.loader:
                raise ModuleNotFoundError(
                    f"No module named '{fullname}' could be resolved."
                )

            module = importlib.util.module_from_spec(spec)
            sys.modules[fullname] = module

            lazy_loader = importlib.util.LazyLoader(spec.loader)
            lazy_loader.exec_module(module)
            return module

class_attributes(mapping) classmethod

Bind lazy property descriptors to class attributes to defer instantiation.

Accepts a dictionary mapping attribute names to either importable module paths or factory callables, and returning a decorator for the target class.

Example

.. code-block:: python

from disdantic.loading import LazyLoader

@LazyLoader.class_attributes({"sys": "sys"})
class MyClass:
    pass

instance = MyClass()
# Accessing sys will load the module lazily
print(instance.sys.path)

:param mapping: Dictionary mapping attribute names to module paths or callables. :returns: A decorator function that updates the class with lazy attributes.

Source code in src/disdantic/loading.py
@classmethod
def class_attributes(
    cls, mapping: dict[str, str | Callable[[], Any]]
) -> Callable[[type[TargetT]], type[TargetT]]:
    """Bind lazy property descriptors to class attributes to defer instantiation.

    Accepts a dictionary mapping attribute names to either importable module paths
    or factory callables, and returning a decorator for the target class.

    Example:
        .. code-block:: python

            from disdantic.loading import LazyLoader

            @LazyLoader.class_attributes({"sys": "sys"})
            class MyClass:
                pass

            instance = MyClass()
            # Accessing sys will load the module lazily
            print(instance.sys.path)

    :param mapping: Dictionary mapping attribute names to module paths or callables.
    :returns: A decorator function that updates the class with lazy attributes.
    """

    def decorator(target_cls: type[TargetT]) -> type[TargetT]:
        for attr_name, target in mapping.items():

            def _create_getter(
                target_val: str | Callable[[], Any] = target,
            ) -> property:
                proxy = LazyProxy(
                    lambda: (
                        importlib.import_module(target_val)  # nosemgrep
                        if isinstance(target_val, str)
                        else target_val()
                    )
                )
                return property(lambda _: proxy._resolve())  # noqa: SLF001

            setattr(target_cls, attr_name, _create_getter())
        return target_cls

    return decorator

definition(factory) classmethod

Create an explicit variable proxy from a functional factory closure.

Defers execution of the factory callable until attributes are accessed on the returned proxy.

Example

.. code-block:: python

from disdantic.loading import LazyLoader

proxy = LazyLoader.definition(lambda: [1, 2, 3])
# Factory remains uncalled until accessed
print(len(proxy))

:param factory: A zero-argument callable returning the target object. :returns: A LazyProxy instance wrapping the factory function.

Source code in src/disdantic/loading.py
@classmethod
def definition(cls, factory: Callable[[], Any]) -> LazyProxy:
    """Create an explicit variable proxy from a functional factory closure.

    Defers execution of the factory callable until attributes are accessed
    on the returned proxy.

    Example:
        .. code-block:: python

            from disdantic.loading import LazyLoader

            proxy = LazyLoader.definition(lambda: [1, 2, 3])
            # Factory remains uncalled until accessed
            print(len(proxy))

    :param factory: A zero-argument callable returning the target object.
    :returns: A LazyProxy instance wrapping the factory function.
    """
    return LazyProxy(factory)

load_module_proxy(fullname) classmethod

Generate a standard lazy loader proxy directly within the module mapping.

Resolves the module spec and registers a lazy-loading module in sys.modules, only executing the module when its attributes are accessed.

Example

.. code-block:: python

from disdantic.loading import LazyLoader

# Loads my_module lazily
my_mod = LazyLoader.load_module_proxy("my_module")

:param fullname: The fully qualified name of the module to lazy-load. :returns: A module proxy that triggers execution of the module on access. :raises ModuleNotFoundError: If the module specified by fullname cannot be resolved.

Source code in src/disdantic/loading.py
@classmethod
def load_module_proxy(cls, fullname: str) -> types.ModuleType:
    """Generate a standard lazy loader proxy directly within the module mapping.

    Resolves the module spec and registers a lazy-loading module in sys.modules,
    only executing the module when its attributes are accessed.

    Example:
        .. code-block:: python

            from disdantic.loading import LazyLoader

            # Loads my_module lazily
            my_mod = LazyLoader.load_module_proxy("my_module")

    :param fullname: The fully qualified name of the module to lazy-load.
    :returns: A module proxy that triggers execution of the module on access.
    :raises ModuleNotFoundError: If the module specified by fullname cannot
        be resolved.
    """
    with cls._global_lock:
        if fullname in sys.modules:
            return sys.modules[fullname]

        spec = importlib.util.find_spec(fullname)
        if not spec or not spec.loader:
            raise ModuleNotFoundError(
                f"No module named '{fullname}' could be resolved."
            )

        module = importlib.util.module_from_spec(spec)
        sys.modules[fullname] = module

        lazy_loader = importlib.util.LazyLoader(spec.loader)
        lazy_loader.exec_module(module)
        return module

module(module_name) classmethod

Create a decorator to lazy-load module subpackages on demand.

Example

.. code-block:: python

import sys
from disdantic.loading import LazyLoader

@LazyLoader.module("disdantic")
def loading(mod):
    pass

:param module_name: The fully qualified name of the package or module. :returns: A decorator function that replaces the target module with a lazy module proxy.

Source code in src/disdantic/loading.py
@classmethod
def module(cls, module_name: str) -> Callable[[types.ModuleType], types.ModuleType]:
    """Create a decorator to lazy-load module subpackages on demand.

    Example:
        .. code-block:: python

            import sys
            from disdantic.loading import LazyLoader

            @LazyLoader.module("disdantic")
            def loading(mod):
                pass

    :param module_name: The fully qualified name of the package or module.
    :returns: A decorator function that replaces the target module with
        a lazy module proxy.
    """

    def decorator(_: types.ModuleType) -> types.ModuleType:
        class LazyModule(types.ModuleType):
            def __getattr__(self, name: str) -> Any:
                with cls._global_lock:
                    try:
                        target_path = f"{module_name}.{name}"
                        return importlib.import_module(target_path)  # nosemgrep
                    except ImportError as error:
                        raise AttributeError(
                            f"Module '{module_name}' has no attribute '{name}'"
                        ) from error

        lazy_mod = LazyModule(module_name)
        sys.modules[module_name] = lazy_mod
        return lazy_mod

    return decorator

LazyProxy

Proxy container wrapping modules or factories until an attribute is read.

Acts as a placeholder for objects whose construction is expensive or requires deferred execution. The target object is instantiated and resolved by invoking a provided factory function upon the first query of attributes, string representation, or directory listing. Resolution is fully thread-safe.

Example

.. code-block:: python

import types
from disdantic.loading import LazyProxy

def expensive_factory():
    return types.SimpleNamespace(data=42)

proxy = LazyProxy(expensive_factory)
# expensive_factory is not called yet
print(proxy.data)  # Triggers resolution and outputs: 42
Source code in src/disdantic/loading.py
class LazyProxy:
    """Proxy container wrapping modules or factories until an attribute is read.

    Acts as a placeholder for objects whose construction is expensive or requires
    deferred execution. The target object is instantiated and resolved by invoking a
    provided factory function upon the first query of attributes, string representation,
    or directory listing. Resolution is fully thread-safe.

    Example:
        .. code-block:: python

            import types
            from disdantic.loading import LazyProxy

            def expensive_factory():
                return types.SimpleNamespace(data=42)

            proxy = LazyProxy(expensive_factory)
            # expensive_factory is not called yet
            print(proxy.data)  # Triggers resolution and outputs: 42
    """

    def __init__(self, factory: Callable[[], Any]) -> None:
        """Initialize the lazy proxy wrapper with a factory callable.

        :param factory: A zero-argument callable returning the target object to wrap.
        """
        self._factory = factory
        self._wrapped: Any = None
        self._resolved = False
        self._lock = threading.Lock()

    def __getattr__(self, name: str) -> Any:
        """Retrieve attributes from the lazily resolved target object.

        :param name: The name of the attribute to retrieve.
        :returns: The attribute value from the resolved object.
        """
        return getattr(self._resolve(), name)

    def __dir__(self) -> list[str]:
        """Return the directory list of attributes from the resolved target object.

        :returns: List of attributes available on the wrapped object.
        """
        return dir(self._resolve())

    def __repr__(self) -> str:
        """Return a string representation of the proxy or the wrapped target.

        :returns: A string indicating initialization state or the wrapped
            representation.
        """
        if not self._resolved:
            return f"<LazyProxy for uninitialized factory: {self._factory}>"
        return repr(self._wrapped)

    def _resolve(self) -> Any:
        # Double-checked locking implementation preventing concurrent serialization
        # or import errors.
        if not self._resolved:
            with self._lock:
                if not self._resolved:
                    self._wrapped = self._factory()
                    self._resolved = True
        return self._wrapped

__dir__()

Return the directory list of attributes from the resolved target object.

:returns: List of attributes available on the wrapped object.

Source code in src/disdantic/loading.py
def __dir__(self) -> list[str]:
    """Return the directory list of attributes from the resolved target object.

    :returns: List of attributes available on the wrapped object.
    """
    return dir(self._resolve())

__getattr__(name)

Retrieve attributes from the lazily resolved target object.

:param name: The name of the attribute to retrieve. :returns: The attribute value from the resolved object.

Source code in src/disdantic/loading.py
def __getattr__(self, name: str) -> Any:
    """Retrieve attributes from the lazily resolved target object.

    :param name: The name of the attribute to retrieve.
    :returns: The attribute value from the resolved object.
    """
    return getattr(self._resolve(), name)

__init__(factory)

Initialize the lazy proxy wrapper with a factory callable.

:param factory: A zero-argument callable returning the target object to wrap.

Source code in src/disdantic/loading.py
def __init__(self, factory: Callable[[], Any]) -> None:
    """Initialize the lazy proxy wrapper with a factory callable.

    :param factory: A zero-argument callable returning the target object to wrap.
    """
    self._factory = factory
    self._wrapped: Any = None
    self._resolved = False
    self._lock = threading.Lock()

__repr__()

Return a string representation of the proxy or the wrapped target.

:returns: A string indicating initialization state or the wrapped representation.

Source code in src/disdantic/loading.py
def __repr__(self) -> str:
    """Return a string representation of the proxy or the wrapped target.

    :returns: A string indicating initialization state or the wrapped
        representation.
    """
    if not self._resolved:
        return f"<LazyProxy for uninitialized factory: {self._factory}>"
    return repr(self._wrapped)

LoggingSettings

Bases: BaseSettings

Settings configuration for the disdantic logging subsystem.

This class defines the configuration schema for logging, loading parameters from environment variables prefixed with DISDANTIC__LOGGING__ or via direct instantiation. It allows customizing the output sink, log level, format template, OpenTelemetry integration, and thread-safe queueing.

Example

.. code-block:: python

from disdantic.logging import LoggingSettings, configure_logger

settings = LoggingSettings(
    enabled=True,
    level="DEBUG",
    sink="stdout"
)
configure_logger(settings)

:cvar model_config: Configuration dictionary dictating environment variable prefixes and nested delimiters.

Source code in src/disdantic/logging.py
class LoggingSettings(BaseSettings):
    """
    Settings configuration for the disdantic logging subsystem.

    This class defines the configuration schema for logging, loading parameters
    from environment variables prefixed with `DISDANTIC__LOGGING__` or via
    direct instantiation. It allows customizing the output sink, log level,
    format template, OpenTelemetry integration, and thread-safe queueing.

    Example:
        .. code-block:: python

            from disdantic.logging import LoggingSettings, configure_logger

            settings = LoggingSettings(
                enabled=True,
                level="DEBUG",
                sink="stdout"
            )
            configure_logger(settings)

    :cvar model_config: Configuration dictionary dictating environment variable
                        prefixes and nested delimiters.
    """

    enabled: bool = Field(
        default=False,
        description=(
            "Enables or disables logging output across the disdantic package."
        ),
    )
    clear_loggers: bool = Field(
        default=False,
        description=(
            "Configures whether all existing active logger sinks "
            "are removed prior to setup."
        ),
    )
    sink: str | Any = Field(
        default=sys.stderr,
        description=(
            "Specifies and maps the output target, such as standard streams "
            "(stdout, stderr) or a file path, for log messages."
        ),
    )
    level: str = Field(
        default="WARNING",
        description=(
            "Configures the minimum severity level required for log messages "
            "to be emitted."
        ),
    )
    otel_formatting: Literal["auto", "enable", "disable"] = Field(
        default="auto",
        description=(
            "Configures the OpenTelemetry-compliant JSON formatting option "
            "(auto, enable, or disable)."
        ),
    )
    format: str | Callable[..., Any] | None = Field(
        default="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | "
        "<cyan>{name}</cyan>:<cyan>{function}</cyan> - <level>{message}</level>\n",
        description=(
            "Configures the standard text template layout for emitted log lines."
        ),
    )
    filter: Any = Field(
        default=True,
        description=(
            "Configures the filtering criteria, using a prefix string, "
            "list of prefixes, or a filter function."
        ),
    )
    enqueue: bool = Field(
        default=True,
        description="Enables or disables asynchronous, thread-safe message queueing.",
    )
    kwargs: dict[str, Any] = Field(
        default_factory=dict,
        description=(
            "Maps additional custom arguments passed directly "
            "to the loguru add handler."
        ),
    )

    model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict(
        env_prefix="DISDANTIC__LOGGING__",
        env_nested_delimiter="__",
    )

    @field_validator("sink", mode="before")
    @classmethod
    def _parse_sink(cls, value: Any) -> Any:
        # Convert string aliases for standard output/error streams to stream objects.
        if isinstance(value, str):
            mapping = {
                "stdout": sys.stdout,
                "sys.stdout": sys.stdout,
                "stderr": sys.stderr,
                "sys.stderr": sys.stderr,
            }
            return mapping.get(value.lower(), value)
        return value

PydanticClassRegistryMixin

Bases: ReloadableBaseModel, RegistryMixin[type[BaseModelT]], ABC, Generic[BaseModelT]

Polymorphic serialization wrapper using dynamic tagged unions.

This class enables dynamic Pydantic model registration and builds tagged union schemas. It automatically routes JSON validation to the correct subclass using a discriminator key.

Example

.. code-block:: python

from disdantic.registry import PydanticClassRegistryMixin
from pydantic import BaseModel

class BaseMessage(PydanticClassRegistryMixin):
    pass

@BaseMessage.register("text")
class TextMessage(BaseMessage):
    content: str

:var schema_discriminator: The serialized tag field name used to identify the target model type.

Source code in src/disdantic/registry.py
class PydanticClassRegistryMixin(
    ReloadableBaseModel, RegistryMixin[type[BaseModelT]], ABC, Generic[BaseModelT]
):
    """
    Polymorphic serialization wrapper using dynamic tagged unions.

    This class enables dynamic Pydantic model registration and builds tagged
    union schemas. It automatically routes JSON validation to the correct subclass
    using a discriminator key.

    Example:
        .. code-block:: python

            from disdantic.registry import PydanticClassRegistryMixin
            from pydantic import BaseModel

            class BaseMessage(PydanticClassRegistryMixin):
                pass

            @BaseMessage.register("text")
            class TextMessage(BaseMessage):
                content: str

    :var schema_discriminator: The serialized tag field name used to identify the
        target model type.
    """

    # 1. Public static / Class-level attributes and properties
    schema_discriminator: ClassVar[str] = "model_type"
    """The serialized tag field name used to identify the target model type."""

    # 2. Public static / Class methods
    @classmethod
    def get_schema_discriminator(cls) -> str:
        """
        Retrieve the active schema discriminator key.

        Resolves to the class-level 'schema_discriminator' or falls back to the
        global settings default.

        :returns: The string key name of the discriminator.
        """
        if "schema_discriminator" in cls.__dict__:
            return cls.schema_discriminator
        return get_settings().default_schema_discriminator

    @classmethod
    def registered_classes(cls) -> tuple[type[BaseModelT], ...]:
        """
        Return all registered BaseModel subclasses in this registry.

        Triggers auto-discovery on access if enabled.

        Example:
            .. code-block:: python

                classes = BaseMessage.registered_classes()

        :raises ValueError: If no classes are registered in the registry.
        :returns: A tuple of registered subclass types.
        """
        registered = cls.registered_objects()
        if not registered:
            raise EmptyRegistryError(
                f"No objects are currently present within the {cls.__name__} "
                f"registry setup."
            )
        return registered

    # 3. Public instance constructors (__init__) and other dunder methods
    @classmethod
    def __get_pydantic_core_schema__(
        cls, source_type: Any, handler: GetCoreSchemaHandler
    ) -> CoreSchema:
        # Builds Pydantic core schemas via dynamic tagged unions.
        # Private hook. Documentation omitted to adhere to visibility constraints.
        base_type = cls.__pydantic_schema_base_type__()
        if source_type == base_type:
            if not cls.registry:
                return cls.__pydantic_generate_base_schema__(handler)

            def check_discriminator_lookahead(value: Any) -> Any:
                # Intercepts validation inputs early to map casing schemas safely.
                # Addresses Bug A by mutating key casings to match requirements.
                discriminator_key = cls.get_schema_discriminator()
                if isinstance(value, dict):
                    discriminator_value = value.get(discriminator_key)
                    if discriminator_value is not None:
                        discriminator_str = str(discriminator_value)
                        lower_match = cls._lower_registry.get(discriminator_str.lower())
                        if not lower_match:
                            raise DiscriminatorNotFoundError(
                                discriminator_str, list(cls.registry.keys())
                            )

                        # Apply lookahead casing transformation to satisfy lookups
                        for canonical_name, model_class in cls.registry.items():
                            if model_class is lower_match:
                                value[discriminator_key] = canonical_name
                                break
                return value

            choices = {
                canonical_name: handler(model_class)
                for canonical_name, model_class in cls.registry.items()
            }
            union_schema = core_schema.tagged_union_schema(
                choices=choices,
                discriminator=cls.get_schema_discriminator(),
            )
            return core_schema.no_info_before_validator_function(
                check_discriminator_lookahead, union_schema
            )

        return handler(cls)

    @classmethod
    def __pydantic_schema_base_type__(cls) -> type[BaseModelT]:
        # Identifies the root base type context of the polymorphic schema loop.
        # Private hook. Documentation omitted to adhere to visibility constraints.
        for index, base_class in enumerate(cls.__mro__):
            is_registry = base_class.__name__ == "PydanticClassRegistryMixin"
            is_registry_generic = base_class.__name__.startswith(
                "PydanticClassRegistryMixin["
            )
            if (is_registry or is_registry_generic) and index > 0:
                return cast("type[BaseModelT]", cls.__mro__[index - 1])
        return cast("type[BaseModelT]", cls)

    @classmethod
    def __pydantic_generate_base_schema__(
        cls, handler: GetCoreSchemaHandler
    ) -> CoreSchema:
        # Generates base core schema when no subclasses are registered.
        # Private hook. Documentation omitted to adhere to visibility constraints.
        return core_schema.any_schema()

    # 4. Public instance properties (None)

    # 5. Public instance methods (None)

    # 6. Private methods (prefixed with _):
    @classmethod
    def _validate_registration_target(cls, target_object: RegisterT) -> None:
        if not isinstance(target_object, type) or not issubclass(
            target_object, BaseModel
        ):
            raise TypeError(
                f"Cannot register "
                f"'{getattr(target_object, '__name__', str(target_object))}': "
                f"must extend Pydantic BaseModel."
            )

    @classmethod
    def _on_registry_change(cls) -> None:
        cls._trigger_base_rebuild()

    @classmethod
    def _trigger_base_rebuild(cls) -> None:
        # Traverses and triggers schema reloading for the base class.
        # Private hook. Documentation omitted to adhere to visibility constraints.
        if not get_settings().enable_schema_rebuilding:
            return

        base_type = cls.__pydantic_schema_base_type__()
        if issubclass(base_type, ReloadableBaseModel):
            base_type.reload_schema()
        else:
            base_type.model_rebuild(force=True)

schema_discriminator = 'model_type' class-attribute

The serialized tag field name used to identify the target model type.

get_schema_discriminator() classmethod

Retrieve the active schema discriminator key.

Resolves to the class-level 'schema_discriminator' or falls back to the global settings default.

:returns: The string key name of the discriminator.

Source code in src/disdantic/registry.py
@classmethod
def get_schema_discriminator(cls) -> str:
    """
    Retrieve the active schema discriminator key.

    Resolves to the class-level 'schema_discriminator' or falls back to the
    global settings default.

    :returns: The string key name of the discriminator.
    """
    if "schema_discriminator" in cls.__dict__:
        return cls.schema_discriminator
    return get_settings().default_schema_discriminator

registered_classes() classmethod

Return all registered BaseModel subclasses in this registry.

Triggers auto-discovery on access if enabled.

Example

.. code-block:: python

classes = BaseMessage.registered_classes()

:raises ValueError: If no classes are registered in the registry. :returns: A tuple of registered subclass types.

Source code in src/disdantic/registry.py
@classmethod
def registered_classes(cls) -> tuple[type[BaseModelT], ...]:
    """
    Return all registered BaseModel subclasses in this registry.

    Triggers auto-discovery on access if enabled.

    Example:
        .. code-block:: python

            classes = BaseMessage.registered_classes()

    :raises ValueError: If no classes are registered in the registry.
    :returns: A tuple of registered subclass types.
    """
    registered = cls.registered_objects()
    if not registered:
        raise EmptyRegistryError(
            f"No objects are currently present within the {cls.__name__} "
            f"registry setup."
        )
    return registered

RegistryDiagnostics

Bases: BaseModel

Configuration, model metadata, and integrity status of a registry.

This class tracks the metadata of a single registry subclassing RegistryMixin. It records the dynamic auto-discovery configuration, registered sub-models, and any orphaned subclasses that were imported but failed to register.

Examples:

.. code-block:: python

from disdantic.diagnose import verify_registries

report = verify_registries()
for registry in report.registries:
    print(f"Registry: {registry.registry_name}")
    print(f"Orphaned subclasses: {registry.orphans}")
Source code in src/disdantic/diagnose.py
class RegistryDiagnostics(BaseModel):
    """Configuration, model metadata, and integrity status of a registry.

    This class tracks the metadata of a single registry subclassing RegistryMixin. It
    records the dynamic auto-discovery configuration, registered sub-models, and any
    orphaned subclasses that were imported but failed to register.

    Examples:
        .. code-block:: python

            from disdantic.diagnose import verify_registries

            report = verify_registries()
            for registry in report.registries:
                print(f"Registry: {registry.registry_name}")
                print(f"Orphaned subclasses: {registry.orphans}")
    """

    registry_name: str = Field(
        description="Maps the name of the registry base class under diagnostic check."
    )
    discriminator_key: str = Field(
        description=(
            "Configures the attribute name used to identify model types in polymorphic "
            "schemas."
        )
    )
    auto_discovery_enabled: bool = Field(
        description=(
            "Enables package scanning and dynamic registration for this registry."
        )
    )
    models: list[RegistryModelInfo] = Field(
        default_factory=list,
        description=(
            "Configures the list of registered models and their compilation states."
        ),
    )
    orphans: list[str] = Field(
        default_factory=list,
        description="Maps imported subclasses that are not registered with any key.",
    )

RegistryManager

Orchestrate tracking and listing active registry namespaces globally.

This class scans the runtime subclass tree of RegistryMixin, discovers registered classes, and generates maps linking registries to their registered components.

Example

.. code-block:: python

from disdantic.registry import RegistryManager

registries = RegistryManager.list_registries()
Source code in src/disdantic/registry.py
class RegistryManager:
    """
    Orchestrate tracking and listing active registry namespaces globally.

    This class scans the runtime subclass tree of RegistryMixin, discovers
    registered classes, and generates maps linking registries to their registered
    components.

    Example:
        .. code-block:: python

            from disdantic.registry import RegistryManager

            registries = RegistryManager.list_registries()
    """

    @classmethod
    def list_registries(cls) -> dict[str, dict[str, str]]:
        """
        Generate a nested map of all active registries and their contents.

        Scans the runtime subclass tree of RegistryMixin, discovers registered
        classes, and generates maps linking registries to their registered
        components.

        Example:
            .. code-block:: python

                mapping = RegistryManager.list_registries()

        :returns: A dictionary mapping registry class names to a nested dict
            of registered keys and target class paths.
        """
        registries = cls._discover_registries()
        result: dict[str, dict[str, str]] = {}
        for registry_class in registries:
            result[registry_class.__name__] = {
                key: cls._resolve_val_path(registry_class.registry[key])
                for key in sorted(registry_class.registry.keys())
            }
        return result

    @classmethod
    def _discover_registries(cls) -> list[type[RegistryMixin[Any]]]:
        settings = get_settings()

        if settings.auto_packages:

            class GlobalImporter(AutoImporterMixin):
                pass

            GlobalImporter.auto_import_package_modules()

        all_subs = cls._get_subclasses(RegistryMixin)
        registries = []

        for sub_type in all_subs:
            if sub_type.__name__ in ("RegistryMixin", "PydanticClassRegistryMixin"):
                continue

            sub = cast("type[RegistryMixin[Any]]", sub_type)
            if sub.is_auto_discovery_enabled():
                with contextlib.suppress(Exception):
                    sub.auto_populate_registry()

            is_root = any(
                getattr(getattr(base_class, "__origin__", base_class), "__name__", "")
                in ("RegistryMixin", "PydanticClassRegistryMixin")
                for base_class in sub.__bases__
            )

            if is_root or bool(sub.registry):
                registries.append(sub)

        return sorted(
            registries,
            key=lambda registry_class: registry_class.__name__,
        )

    @staticmethod
    def _get_subclasses(target_class: type) -> set[type]:
        subs = {
            sub
            for sub in target_class.__subclasses__()
            if (
                hasattr(sub, "__module__")
                and isinstance(sub.__module__, str)
                and sub.__module__ in sys.modules
            )
        }
        return subs.union(*(RegistryManager._get_subclasses(sub) for sub in subs))

    @staticmethod
    def _resolve_val_path(val: Any) -> str:
        mod = getattr(val, "__module__", None)
        name = getattr(val, "__name__", None)
        if mod and name:
            return f"{mod}.{name}"
        return f"{val.__class__.__module__}.{val.__class__.__name__}"

list_registries() classmethod

Generate a nested map of all active registries and their contents.

Scans the runtime subclass tree of RegistryMixin, discovers registered classes, and generates maps linking registries to their registered components.

Example

.. code-block:: python

mapping = RegistryManager.list_registries()

:returns: A dictionary mapping registry class names to a nested dict of registered keys and target class paths.

Source code in src/disdantic/registry.py
@classmethod
def list_registries(cls) -> dict[str, dict[str, str]]:
    """
    Generate a nested map of all active registries and their contents.

    Scans the runtime subclass tree of RegistryMixin, discovers registered
    classes, and generates maps linking registries to their registered
    components.

    Example:
        .. code-block:: python

            mapping = RegistryManager.list_registries()

    :returns: A dictionary mapping registry class names to a nested dict
        of registered keys and target class paths.
    """
    registries = cls._discover_registries()
    result: dict[str, dict[str, str]] = {}
    for registry_class in registries:
        result[registry_class.__name__] = {
            key: cls._resolve_val_path(registry_class.registry[key])
            for key in sorted(registry_class.registry.keys())
        }
    return result

RegistryMixin

Bases: Generic[RegistryObjT], AutoImporterMixin

Isolated registry namespace tracking across distinct base class domains.

This mixin provides base class structures with automated tracking mappings, enabling subclasses to map dynamic components and discover submodules recursively. Each registry is fully isolated to avoid leakage between unrelated interfaces. It supports registration decorators, lookup checks, and programmatic unregistration.

Example

.. code-block:: python

from disdantic.registry import RegistryMixin

class ComponentRegistry(RegistryMixin[type]):
    pass

@ComponentRegistry.register("service")
class MyService:
    pass

:var registry: The canonical tracking directory mapping identifiers to registered objects. :var registry_auto_discovery: Controls whether sub-module scanning is automatically triggered on access. :var registry_populated: Tracks if the auto-population routine has completed for this namespace.

Source code in src/disdantic/registry.py
class RegistryMixin(Generic[RegistryObjT], AutoImporterMixin):
    """
    Isolated registry namespace tracking across distinct base class domains.

    This mixin provides base class structures with automated tracking mappings,
    enabling subclasses to map dynamic components and discover submodules recursively.
    Each registry is fully isolated to avoid leakage between unrelated interfaces.
    It supports registration decorators, lookup checks, and programmatic unregistration.

    Example:
        .. code-block:: python

            from disdantic.registry import RegistryMixin

            class ComponentRegistry(RegistryMixin[type]):
                pass

            @ComponentRegistry.register("service")
            class MyService:
                pass

    :var registry: The canonical tracking directory mapping identifiers to
        registered objects.
    :var registry_auto_discovery: Controls whether sub-module scanning is
        automatically triggered on access.
    :var registry_populated: Tracks if the auto-population routine has
        completed for this namespace.
    """

    # 1. Public static / Class-level attributes and properties
    registry: ClassVar[dict[str, Any]]
    """The canonical tracking directory mapping identifiers to registered objects."""

    registry_auto_discovery: ClassVar[bool] = False
    """Controls whether sub-module scanning is automatically triggered on access."""

    registry_populated: ClassVar[bool] = False
    """Tracks if the auto-population routine has completed for this namespace."""

    _lower_registry: ClassVar[dict[str, Any]]
    _registry_lock: ClassVar[threading.RLock]

    # 2. Public static / Class methods
    @classmethod
    def is_auto_discovery_enabled(cls) -> bool:
        """
        Determine if automatic module discovery is active for this registry.

        Checks both the class-level configuration option and the fallback
        global settings to decide if submodules should be scanned.

        :returns: True if auto-discovery is enabled, False otherwise.
        """
        if getattr(cls, "registry_auto_discovery", False):
            return True
        return get_settings().registry_auto_discovery

    @classmethod
    def register(
        cls, name: str | Sequence[str] | None = None
    ) -> Callable[[RegisterT], RegisterT]:
        """
        Decorate subclass implementations to register them under name keys.

        Registers the decorated class or object under the specified name or
        list of names. If no name is provided, uses the class name.

        Example:
            .. code-block:: python

                @MyRegistry.register("custom_name")
                class SubComponent:
                    pass

        :param name: Optional registration keys. Can be a single string, a sequence
            of strings, or None to default to the target's name.
        :returns: A decorator function that registers the target object.
        """

        def _decorator(target_object: RegisterT) -> RegisterT:
            cls.register_decorator(target_object, name=name)
            return target_object

        return _decorator

    @classmethod
    def register_decorator(
        cls, target_object: RegisterT, name: str | Sequence[str] | None = None
    ) -> RegisterT:
        """
        Index target objects directly into namespace mappings.

        Performs collision checking to prevent duplicate registration within the
        same namespace.

        Example:
            .. code-block:: python

                MyRegistry.register_decorator(MyService, name="service")

        :param target_object: The class or object instance to register.
        :param name: Optional registration keys. Can be a single string, a sequence
            of strings, or None to default to the target's name.
        :raises ValueError: If the naming format is unsupported or if a key is not
            a string.
        :raises RegistryCollisionError: If a key is already registered.
        :returns: The original target object after successful registration.
        """
        cls._validate_registration_target(target_object)
        with cls._registry_lock:
            if name is None:
                resolved_names: list[str] = [
                    getattr(target_object, "__name__", str(target_object))
                ]
            elif isinstance(name, str):
                resolved_names = [name]
            elif isinstance(name, Sequence):
                resolved_names = list(name)
            else:
                raise ValueError(f"Unsupported naming format provided: {type(name)}")

            for resolved_name in resolved_names:
                if not isinstance(resolved_name, str):
                    raise TypeError(
                        f"Registry keys must explicitly be strings. "
                        f"Got: {type(resolved_name)}"
                    )

                if resolved_name in cls.registry:
                    raise RegistryCollisionError(
                        f"Collision detected: '{resolved_name}' already exists within "
                        f"{cls.__name__} registry."
                    )

                cls.registry[resolved_name] = target_object
                cls._lower_registry[resolved_name.lower()] = target_object

            cls._on_registry_change()
            return target_object

    @classmethod
    def auto_populate_registry(cls) -> bool:
        """
        Scan packages and automatically populate the registry namespace.

        Discovers and imports modules dynamically if auto-discovery is enabled and
        the registry has not been populated yet.

        Example:
            .. code-block:: python

                MyRegistry.auto_populate_registry()

        :raises ValueError: If auto-discovery is disabled on the registry class.
        :returns: True if the registry was newly populated, False if it was
            already populated.
        """
        with cls._registry_lock:
            if not cls.is_auto_discovery_enabled():
                raise AutoPopulationError(
                    f"Auto-population rejected: registry_auto_discovery is "
                    f"disabled on {cls.__name__}."
                )

            if cls.registry_populated:
                return False

            cls.auto_import_package_modules()
            cls.registry_populated = True
            cls._on_registry_change()
            return True

    @classmethod
    def registered_objects(cls) -> tuple[RegistryObjT, ...]:
        """
        Retrieve all registered objects in the registry tracking frame.

        Triggers auto-population if auto-discovery is enabled before returning
        the objects.

        Example:
            .. code-block:: python

                objects = MyRegistry.registered_objects()

        :returns: A tuple of all registered objects.
        """
        with cls._registry_lock:
            if cls.is_auto_discovery_enabled():
                with contextlib.suppress(AutoPopulationError):
                    cls.auto_populate_registry()
            return tuple(cls.registry.values())

    @classmethod
    def is_registered(cls, name: str) -> bool:
        """
        Verify presence of an identifier in the registry.

        Performs case-insensitive checks against the registered keys.

        Example:
            .. code-block:: python

                exists = MyRegistry.is_registered("custom_name")

        :param name: The identifier key to verify.
        :returns: True if the identifier is registered, False otherwise.
        """
        with cls._registry_lock:
            if cls.is_auto_discovery_enabled():
                with contextlib.suppress(AutoPopulationError):
                    cls.auto_populate_registry()
            return name in cls.registry or name.lower() in cls._lower_registry

    @classmethod
    def get_registered_object(cls, name: str) -> RegistryObjT | None:
        """
        Look up a registered object by its identifier.

        Looks up the given name directly in the canonical registry, falling back to a
        case-insensitive lookup in the lowercase registry.

        Example:
            .. code-block:: python

                obj = MyRegistry.get_registered_object("custom_name")

        :param name: The identifier key of the registered object.
        :returns: The registered object if found, or None.
        """
        with cls._registry_lock:
            if cls.is_auto_discovery_enabled():
                with contextlib.suppress(AutoPopulationError):
                    cls.auto_populate_registry()
            return cls.registry.get(name) or cls._lower_registry.get(name.lower())

    @classmethod
    def clear_registry(cls) -> None:
        """
        Clear all active registrations and reset import states.

        Clears all mappings in canonical and lowercase registries, and resets flags
        so that subsequent operations can re-populate the registry.

        Example:
            .. code-block:: python

                MyRegistry.clear_registry()

        :returns: None.
        """
        with cls._registry_lock:
            cls.registry.clear()
            cls._lower_registry.clear()
            cls.registry_populated = False
            cls.reset_importer_cache()
            cls._on_registry_change()

    @classmethod
    def unregister(cls, name: str) -> None:
        """
        Remove a registered identifier from both tracking mappings.

        Removes from canonical and case-insensitive mapping caches. If it is
        a PydanticClassRegistryMixin, triggers a core schema rebuild of the
        base registry class and its registered hierarchy.

        Example:
            .. code-block:: python

                MyRegistry.unregister("custom_name")

        :param name: The registered identifier/token to remove.
        :raises ValueError: If the token is not present in the registry.
        :returns: None.
        """
        with cls._registry_lock:
            keys_to_remove = [
                key for key in cls.registry if key.lower() == name.lower()
            ]
            if not keys_to_remove:
                raise ValueError(
                    f"Identifier '{name}' is not present in "
                    f"the {cls.__name__} registry."
                )
            for key in keys_to_remove:
                cls.registry.pop(key, None)
                cls._lower_registry.pop(key.lower(), None)
            cls._on_registry_change()

    # 3. Public instance constructors (__init__) and other dunder methods
    def __init_subclass__(cls, **kwargs: Any) -> None:
        # Private hook. Documentation omitted to adhere to visibility constraints.
        super().__init_subclass__(**kwargs)
        cls.registry = {}
        cls._lower_registry = {}
        cls.registry_populated = False
        cls._registry_lock = threading.RLock()

    # 4. Public instance properties (None)

    # 5. Public instance methods (None)

    # 6. Private methods (prefixed with _):
    @classmethod
    def _validate_registration_target(cls, target_object: RegisterT) -> None:
        pass

    @classmethod
    def _on_registry_change(cls) -> None:
        pass

registry class-attribute

The canonical tracking directory mapping identifiers to registered objects.

registry_auto_discovery = False class-attribute

Controls whether sub-module scanning is automatically triggered on access.

registry_populated = False class-attribute

Tracks if the auto-population routine has completed for this namespace.

auto_populate_registry() classmethod

Scan packages and automatically populate the registry namespace.

Discovers and imports modules dynamically if auto-discovery is enabled and the registry has not been populated yet.

Example

.. code-block:: python

MyRegistry.auto_populate_registry()

:raises ValueError: If auto-discovery is disabled on the registry class. :returns: True if the registry was newly populated, False if it was already populated.

Source code in src/disdantic/registry.py
@classmethod
def auto_populate_registry(cls) -> bool:
    """
    Scan packages and automatically populate the registry namespace.

    Discovers and imports modules dynamically if auto-discovery is enabled and
    the registry has not been populated yet.

    Example:
        .. code-block:: python

            MyRegistry.auto_populate_registry()

    :raises ValueError: If auto-discovery is disabled on the registry class.
    :returns: True if the registry was newly populated, False if it was
        already populated.
    """
    with cls._registry_lock:
        if not cls.is_auto_discovery_enabled():
            raise AutoPopulationError(
                f"Auto-population rejected: registry_auto_discovery is "
                f"disabled on {cls.__name__}."
            )

        if cls.registry_populated:
            return False

        cls.auto_import_package_modules()
        cls.registry_populated = True
        cls._on_registry_change()
        return True

clear_registry() classmethod

Clear all active registrations and reset import states.

Clears all mappings in canonical and lowercase registries, and resets flags so that subsequent operations can re-populate the registry.

Example

.. code-block:: python

MyRegistry.clear_registry()

:returns: None.

Source code in src/disdantic/registry.py
@classmethod
def clear_registry(cls) -> None:
    """
    Clear all active registrations and reset import states.

    Clears all mappings in canonical and lowercase registries, and resets flags
    so that subsequent operations can re-populate the registry.

    Example:
        .. code-block:: python

            MyRegistry.clear_registry()

    :returns: None.
    """
    with cls._registry_lock:
        cls.registry.clear()
        cls._lower_registry.clear()
        cls.registry_populated = False
        cls.reset_importer_cache()
        cls._on_registry_change()

get_registered_object(name) classmethod

Look up a registered object by its identifier.

Looks up the given name directly in the canonical registry, falling back to a case-insensitive lookup in the lowercase registry.

Example

.. code-block:: python

obj = MyRegistry.get_registered_object("custom_name")

:param name: The identifier key of the registered object. :returns: The registered object if found, or None.

Source code in src/disdantic/registry.py
@classmethod
def get_registered_object(cls, name: str) -> RegistryObjT | None:
    """
    Look up a registered object by its identifier.

    Looks up the given name directly in the canonical registry, falling back to a
    case-insensitive lookup in the lowercase registry.

    Example:
        .. code-block:: python

            obj = MyRegistry.get_registered_object("custom_name")

    :param name: The identifier key of the registered object.
    :returns: The registered object if found, or None.
    """
    with cls._registry_lock:
        if cls.is_auto_discovery_enabled():
            with contextlib.suppress(AutoPopulationError):
                cls.auto_populate_registry()
        return cls.registry.get(name) or cls._lower_registry.get(name.lower())

is_auto_discovery_enabled() classmethod

Determine if automatic module discovery is active for this registry.

Checks both the class-level configuration option and the fallback global settings to decide if submodules should be scanned.

:returns: True if auto-discovery is enabled, False otherwise.

Source code in src/disdantic/registry.py
@classmethod
def is_auto_discovery_enabled(cls) -> bool:
    """
    Determine if automatic module discovery is active for this registry.

    Checks both the class-level configuration option and the fallback
    global settings to decide if submodules should be scanned.

    :returns: True if auto-discovery is enabled, False otherwise.
    """
    if getattr(cls, "registry_auto_discovery", False):
        return True
    return get_settings().registry_auto_discovery

is_registered(name) classmethod

Verify presence of an identifier in the registry.

Performs case-insensitive checks against the registered keys.

Example

.. code-block:: python

exists = MyRegistry.is_registered("custom_name")

:param name: The identifier key to verify. :returns: True if the identifier is registered, False otherwise.

Source code in src/disdantic/registry.py
@classmethod
def is_registered(cls, name: str) -> bool:
    """
    Verify presence of an identifier in the registry.

    Performs case-insensitive checks against the registered keys.

    Example:
        .. code-block:: python

            exists = MyRegistry.is_registered("custom_name")

    :param name: The identifier key to verify.
    :returns: True if the identifier is registered, False otherwise.
    """
    with cls._registry_lock:
        if cls.is_auto_discovery_enabled():
            with contextlib.suppress(AutoPopulationError):
                cls.auto_populate_registry()
        return name in cls.registry or name.lower() in cls._lower_registry

register(name=None) classmethod

Decorate subclass implementations to register them under name keys.

Registers the decorated class or object under the specified name or list of names. If no name is provided, uses the class name.

Example

.. code-block:: python

@MyRegistry.register("custom_name")
class SubComponent:
    pass

:param name: Optional registration keys. Can be a single string, a sequence of strings, or None to default to the target's name. :returns: A decorator function that registers the target object.

Source code in src/disdantic/registry.py
@classmethod
def register(
    cls, name: str | Sequence[str] | None = None
) -> Callable[[RegisterT], RegisterT]:
    """
    Decorate subclass implementations to register them under name keys.

    Registers the decorated class or object under the specified name or
    list of names. If no name is provided, uses the class name.

    Example:
        .. code-block:: python

            @MyRegistry.register("custom_name")
            class SubComponent:
                pass

    :param name: Optional registration keys. Can be a single string, a sequence
        of strings, or None to default to the target's name.
    :returns: A decorator function that registers the target object.
    """

    def _decorator(target_object: RegisterT) -> RegisterT:
        cls.register_decorator(target_object, name=name)
        return target_object

    return _decorator

register_decorator(target_object, name=None) classmethod

Index target objects directly into namespace mappings.

Performs collision checking to prevent duplicate registration within the same namespace.

Example

.. code-block:: python

MyRegistry.register_decorator(MyService, name="service")

:param target_object: The class or object instance to register. :param name: Optional registration keys. Can be a single string, a sequence of strings, or None to default to the target's name. :raises ValueError: If the naming format is unsupported or if a key is not a string. :raises RegistryCollisionError: If a key is already registered. :returns: The original target object after successful registration.

Source code in src/disdantic/registry.py
@classmethod
def register_decorator(
    cls, target_object: RegisterT, name: str | Sequence[str] | None = None
) -> RegisterT:
    """
    Index target objects directly into namespace mappings.

    Performs collision checking to prevent duplicate registration within the
    same namespace.

    Example:
        .. code-block:: python

            MyRegistry.register_decorator(MyService, name="service")

    :param target_object: The class or object instance to register.
    :param name: Optional registration keys. Can be a single string, a sequence
        of strings, or None to default to the target's name.
    :raises ValueError: If the naming format is unsupported or if a key is not
        a string.
    :raises RegistryCollisionError: If a key is already registered.
    :returns: The original target object after successful registration.
    """
    cls._validate_registration_target(target_object)
    with cls._registry_lock:
        if name is None:
            resolved_names: list[str] = [
                getattr(target_object, "__name__", str(target_object))
            ]
        elif isinstance(name, str):
            resolved_names = [name]
        elif isinstance(name, Sequence):
            resolved_names = list(name)
        else:
            raise ValueError(f"Unsupported naming format provided: {type(name)}")

        for resolved_name in resolved_names:
            if not isinstance(resolved_name, str):
                raise TypeError(
                    f"Registry keys must explicitly be strings. "
                    f"Got: {type(resolved_name)}"
                )

            if resolved_name in cls.registry:
                raise RegistryCollisionError(
                    f"Collision detected: '{resolved_name}' already exists within "
                    f"{cls.__name__} registry."
                )

            cls.registry[resolved_name] = target_object
            cls._lower_registry[resolved_name.lower()] = target_object

        cls._on_registry_change()
        return target_object

registered_objects() classmethod

Retrieve all registered objects in the registry tracking frame.

Triggers auto-population if auto-discovery is enabled before returning the objects.

Example

.. code-block:: python

objects = MyRegistry.registered_objects()

:returns: A tuple of all registered objects.

Source code in src/disdantic/registry.py
@classmethod
def registered_objects(cls) -> tuple[RegistryObjT, ...]:
    """
    Retrieve all registered objects in the registry tracking frame.

    Triggers auto-population if auto-discovery is enabled before returning
    the objects.

    Example:
        .. code-block:: python

            objects = MyRegistry.registered_objects()

    :returns: A tuple of all registered objects.
    """
    with cls._registry_lock:
        if cls.is_auto_discovery_enabled():
            with contextlib.suppress(AutoPopulationError):
                cls.auto_populate_registry()
        return tuple(cls.registry.values())

unregister(name) classmethod

Remove a registered identifier from both tracking mappings.

Removes from canonical and case-insensitive mapping caches. If it is a PydanticClassRegistryMixin, triggers a core schema rebuild of the base registry class and its registered hierarchy.

Example

.. code-block:: python

MyRegistry.unregister("custom_name")

:param name: The registered identifier/token to remove. :raises ValueError: If the token is not present in the registry. :returns: None.

Source code in src/disdantic/registry.py
@classmethod
def unregister(cls, name: str) -> None:
    """
    Remove a registered identifier from both tracking mappings.

    Removes from canonical and case-insensitive mapping caches. If it is
    a PydanticClassRegistryMixin, triggers a core schema rebuild of the
    base registry class and its registered hierarchy.

    Example:
        .. code-block:: python

            MyRegistry.unregister("custom_name")

    :param name: The registered identifier/token to remove.
    :raises ValueError: If the token is not present in the registry.
    :returns: None.
    """
    with cls._registry_lock:
        keys_to_remove = [
            key for key in cls.registry if key.lower() == name.lower()
        ]
        if not keys_to_remove:
            raise ValueError(
                f"Identifier '{name}' is not present in "
                f"the {cls.__name__} registry."
            )
        for key in keys_to_remove:
            cls.registry.pop(key, None)
            cls._lower_registry.pop(key.lower(), None)
        cls._on_registry_change()

RegistryModelInfo

Bases: BaseModel

Metadata and compilation status of a registered subclass inside a registry.

This class captures the import details, registration key, and Pydantic schema compilation status of a single model registered under a RegistryMixin class.

Examples:

.. code-block:: python

from disdantic.diagnose import verify_registries

report = verify_registries()
for registry in report.registries:
    for model in registry.models:
        if model.compilation_status == "error":
            print(
                f"Model {model.class_name} compilation error: "
                f"{model.error_detail}"
            )
Source code in src/disdantic/diagnose.py
class RegistryModelInfo(BaseModel):
    """Metadata and compilation status of a registered subclass inside a registry.

    This class captures the import details, registration key, and Pydantic schema
    compilation status of a single model registered under a RegistryMixin class.

    Examples:
        .. code-block:: python

            from disdantic.diagnose import verify_registries

            report = verify_registries()
            for registry in report.registries:
                for model in registry.models:
                    if model.compilation_status == "error":
                        print(
                            f"Model {model.class_name} compilation error: "
                            f"{model.error_detail}"
                        )
    """

    key: str = Field(
        description="Maps the unique lookup key used to register this subclass."
    )
    class_name: str = Field(description="Maps the name of the registered Python class.")
    module_path: str = Field(
        description="Maps the import path of the module defining the model class."
    )
    compilation_status: str = Field(
        description="Configures the health status, set to 'healthy' or 'error'."
    )
    error_detail: str | None = Field(
        default=None,
        description=(
            "Maps the traceback or validation error message if compilation failed."
        ),
    )

ReloadableBaseModel

Bases: BaseModel

Pydantic base model that dynamically cascades validation schema updates.

This class enables reloading of parent and dependent schemas when child or dependent schemas are dynamically updated at runtime. By subclassing ReloadableBaseModel, any modification to a child model can automatically trigger updates to parent models that reference the child model in their fields.

It functions by traversing Python's subclass tree and evaluating field annotations recursively, identifying models that reference a modified target. The class respects configuration flags from the global registry settings to selectively enable or disable rebuild propagation.

This class does not define any fields or class variables itself; it is intended solely as an abstract base class for reloadable models.

.. code-block:: python

from pydantic import Field
from disdantic.model import ReloadableBaseModel

class ChildModel(ReloadableBaseModel):
    value: str

class ParentModel(ReloadableBaseModel):
    child: ChildModel

# Rebuilding ChildModel will automatically cascade to ParentModel
ChildModel.reload_schema()
Source code in src/disdantic/model.py
class ReloadableBaseModel(BaseModel):
    """Pydantic base model that dynamically cascades validation schema updates.

    This class enables reloading of parent and dependent schemas when child or
    dependent schemas are dynamically updated at runtime. By subclassing
    `ReloadableBaseModel`, any modification to a child model can automatically
    trigger updates to parent models that reference the child model in their
    fields.

    It functions by traversing Python's subclass tree and evaluating field
    annotations recursively, identifying models that reference a modified
    target. The class respects configuration flags from the global registry
    settings to selectively enable or disable rebuild propagation.

    This class does not define any fields or class variables itself; it is
    intended solely as an abstract base class for reloadable models.

    .. code-block:: python

        from pydantic import Field
        from disdantic.model import ReloadableBaseModel

        class ChildModel(ReloadableBaseModel):
            value: str

        class ParentModel(ReloadableBaseModel):
            child: ChildModel

        # Rebuilding ChildModel will automatically cascade to ParentModel
        ChildModel.reload_schema()
    """

    @classmethod
    def reload_schema(cls, parents: bool = True) -> None:
        """Forces a compilation rebuild of the local core schema.

        This method triggers Pydantic's underlying model rebuilding process for
        the target model, forcing a compilation of its core schema. If parent
        cascading is requested and enabled globally, it also traverses the
        dependency tree to rebuild all models referencing this target model.

        .. code-block:: python

            ReloadableBaseModel.reload_schema(parents=True)

        :param parents: Specifies whether schema updates should propagate to
            dependent parent models.
        :returns: None.
        """
        settings = get_settings()

        if not settings.enable_schema_rebuilding:
            return

        cls.model_rebuild(force=True)
        if parents and settings.schema_rebuild_parents:
            cls.reload_parent_schemas()

    @classmethod
    def reload_parent_schemas(cls) -> None:
        """Traverses subclasses and rebuilds all dependent parent schemas.

        This method scans all active subclasses of `BaseModel` in the runtime
        registry, identifies which of those models reference the current model
        class (or any of its parent classes in its MRO), and triggers a
        topological rebuild of those dependents.

        .. code-block:: python

            ReloadableBaseModel.reload_parent_schemas()

        :returns: None.
        """
        potential_parents: set[type[BaseModel]] = set()
        stack: list[type[BaseModel]] = [BaseModel]

        while stack:
            current = stack.pop()
            for subclass in current.__subclasses__():
                if (
                    subclass is not cls
                    and subclass not in potential_parents
                    and hasattr(subclass, "__module__")
                    and isinstance(subclass.__module__, str)
                    and subclass.__module__ in sys.modules
                ):
                    potential_parents.add(subclass)
                    stack.append(subclass)

        for check in cls.__mro__:
            if (
                isinstance(check, type)
                and issubclass(check, BaseModel)
                and check is not BaseModel
                and check is not ReloadableBaseModel
            ):
                cls._rebuild_dependents(check, potential_parents)

    @classmethod
    def _rebuild_dependents(
        cls,
        target: type[BaseModel],
        types: set[type[BaseModel]],
    ) -> None:
        # Gather all checkable model classes and build the reference adjacency list.
        all_types = types | {target}
        dependents = cls._build_dependency_map(all_types)

        # Find transitively reachable dependents of the target (excluding target).
        reachable = cls._find_reachable(target, dependents)
        subgraph_nodes = reachable - {target}
        if not subgraph_nodes:
            return

        # Sort parent models topologically (dependencies before dependents).
        ordered = cls._topological_sort(subgraph_nodes, dependents)

        for parent_cls in ordered:
            parent_cls.model_rebuild(force=True)

    @classmethod
    def _build_dependency_map(
        cls, types: set[type[BaseModel]]
    ) -> dict[type[BaseModel], set[type[BaseModel]]]:
        dependents: dict[type[BaseModel], set[type[BaseModel]]] = {
            model_cls: set() for model_cls in types
        }

        # Map each model to the set of models that directly depend on it.
        for candidate in types:
            for possible_dep in types:
                if possible_dep is not candidate and any(
                    cls._references_type(possible_dep, field.annotation)
                    for field in candidate.model_fields.values()
                    if field.annotation
                ):
                    dependents[possible_dep].add(candidate)
        return dependents

    @classmethod
    def _find_reachable(
        cls,
        target: type[BaseModel],
        dependents: dict[type[BaseModel], set[type[BaseModel]]],
    ) -> set[type[BaseModel]]:
        # DFS traversal to find all transitively reachable dependent models.
        reachable: set[type[BaseModel]] = set()
        stack: list[type[BaseModel]] = [target]
        while stack:
            curr = stack.pop()
            if curr not in reachable:
                reachable.add(curr)
                stack.extend(dependents.get(curr, ()))
        return reachable

    @classmethod
    def _topological_sort(
        cls,
        subgraph_nodes: set[type[BaseModel]],
        dependents: dict[type[BaseModel], set[type[BaseModel]]],
    ) -> list[type[BaseModel]]:
        # Kahn's algorithm: sort dependents to rebuild parent schemas in order.
        in_degree: dict[type[BaseModel], int] = dict.fromkeys(subgraph_nodes, 0)
        subgraph_dependents: dict[type[BaseModel], set[type[BaseModel]]] = {
            node: set() for node in subgraph_nodes
        }

        for node_u in subgraph_nodes:
            for node_v in dependents.get(node_u, ()):
                if node_v in subgraph_nodes:
                    subgraph_dependents[node_u].add(node_v)
                    in_degree[node_v] += 1

        queue: list[type[BaseModel]] = [
            node for node, deg in in_degree.items() if deg == 0
        ]
        ordered: list[type[BaseModel]] = []

        while queue:
            # Sort lexicographically by name to ensure a stable, deterministic order.
            queue.sort(key=lambda model: model.__name__)
            curr = queue.pop(0)
            ordered.append(curr)

            for neighbor in subgraph_dependents[curr]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)

        # Fallback for cyclic/recursive schemas: append remaining nodes alphabetically.
        if len(ordered) < len(subgraph_nodes):
            remaining = sorted(
                subgraph_nodes - set(ordered),
                key=lambda model: model.__name__,
            )
            ordered.extend(remaining)

        return ordered

    @classmethod
    def _references_type(cls, target: type, candidate: Any) -> bool:
        # Recursively check types, postponed annotations, and generic arguments.
        if target is candidate:
            return True

        # Match postponed string annotations (e.g., "ChildModel").
        if isinstance(candidate, str):
            return candidate == target.__name__ or candidate.endswith(
                f".{target.__name__}"
            )

        origin = get_origin(candidate)
        if origin is None:
            return isinstance(candidate, type) and issubclass(candidate, target)

        if isinstance(origin, type) and issubclass(origin, target):
            return True

        return any(cls._references_type(target, arg) for arg in get_args(candidate))

reload_parent_schemas() classmethod

Traverses subclasses and rebuilds all dependent parent schemas.

This method scans all active subclasses of BaseModel in the runtime registry, identifies which of those models reference the current model class (or any of its parent classes in its MRO), and triggers a topological rebuild of those dependents.

.. code-block:: python

ReloadableBaseModel.reload_parent_schemas()

:returns: None.

Source code in src/disdantic/model.py
@classmethod
def reload_parent_schemas(cls) -> None:
    """Traverses subclasses and rebuilds all dependent parent schemas.

    This method scans all active subclasses of `BaseModel` in the runtime
    registry, identifies which of those models reference the current model
    class (or any of its parent classes in its MRO), and triggers a
    topological rebuild of those dependents.

    .. code-block:: python

        ReloadableBaseModel.reload_parent_schemas()

    :returns: None.
    """
    potential_parents: set[type[BaseModel]] = set()
    stack: list[type[BaseModel]] = [BaseModel]

    while stack:
        current = stack.pop()
        for subclass in current.__subclasses__():
            if (
                subclass is not cls
                and subclass not in potential_parents
                and hasattr(subclass, "__module__")
                and isinstance(subclass.__module__, str)
                and subclass.__module__ in sys.modules
            ):
                potential_parents.add(subclass)
                stack.append(subclass)

    for check in cls.__mro__:
        if (
            isinstance(check, type)
            and issubclass(check, BaseModel)
            and check is not BaseModel
            and check is not ReloadableBaseModel
        ):
            cls._rebuild_dependents(check, potential_parents)

reload_schema(parents=True) classmethod

Forces a compilation rebuild of the local core schema.

This method triggers Pydantic's underlying model rebuilding process for the target model, forcing a compilation of its core schema. If parent cascading is requested and enabled globally, it also traverses the dependency tree to rebuild all models referencing this target model.

.. code-block:: python

ReloadableBaseModel.reload_schema(parents=True)

:param parents: Specifies whether schema updates should propagate to dependent parent models. :returns: None.

Source code in src/disdantic/model.py
@classmethod
def reload_schema(cls, parents: bool = True) -> None:
    """Forces a compilation rebuild of the local core schema.

    This method triggers Pydantic's underlying model rebuilding process for
    the target model, forcing a compilation of its core schema. If parent
    cascading is requested and enabled globally, it also traverses the
    dependency tree to rebuild all models referencing this target model.

    .. code-block:: python

        ReloadableBaseModel.reload_schema(parents=True)

    :param parents: Specifies whether schema updates should propagate to
        dependent parent models.
    :returns: None.
    """
    settings = get_settings()

    if not settings.enable_schema_rebuilding:
        return

    cls.model_rebuild(force=True)
    if parents and settings.schema_rebuild_parents:
        cls.reload_parent_schemas()

Settings

Bases: BaseSettings

Central configuration store and validation schema for the disdantic package.

This class serves as the single source of truth for runtime configurations, defining default options and loading overrides dynamically across modules. It integrates with Pydantic's BaseSettings to enforce type validation, coercion, and environment prefixing.

Example

.. code-block:: python

from disdantic.settings import Settings, get_settings

# Initialize a localized settings instance
settings = Settings(default_schema_discriminator="custom_type")
assert settings.default_schema_discriminator == "custom_type"

# Retrieve the global settings singleton instance
global_settings = get_settings()
print(global_settings.project_root)
Source code in src/disdantic/settings.py
class Settings(BaseSettings):
    """Central configuration store and validation schema for the disdantic package.

    This class serves as the single source of truth for runtime configurations,
    defining default options and loading overrides dynamically across modules.
    It integrates with Pydantic's BaseSettings to enforce type validation,
    coercion, and environment prefixing.

    Example:
        .. code-block:: python

            from disdantic.settings import Settings, get_settings

            # Initialize a localized settings instance
            settings = Settings(default_schema_discriminator="custom_type")
            assert settings.default_schema_discriminator == "custom_type"

            # Retrieve the global settings singleton instance
            global_settings = get_settings()
            print(global_settings.project_root)
    """

    model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict(
        arbitrary_types_allowed=True,
        extra="ignore",
        populate_by_name=True,
        validate_assignment=True,
        env_prefix="DISDANTIC__",
        cli_prefix="disdantic_",
        cli_parse_args=True,
        pyproject_toml_table_header=("tool", "disdantic"),
    )
    """Configuration dictionary defining Pydantic Settings behavior options.

    Controls environment variables parsing prefix, CLI integration arguments,
    pyproject.toml headers, and assignment validation rules.
    """

    # Core Application Properties
    project_root: Path = Field(
        default_factory=Path.cwd,
        description=(
            "Maps the file system paths and resolves relative configuration files."
        ),
    )

    # Registry & Serialization Settings
    default_schema_discriminator: str = Field(
        default="model_type",
        description=(
            "Maps and retrieves model types within Pydantic registries using this "
            "key name."
        ),
    )
    registry_auto_discovery: bool = Field(
        default=False,
        description=(
            "Enables automatic scanning and loading of subclasses during registry "
            "initialization."
        ),
    )

    # Auto Import Settings
    auto_packages: list[str] = Field(
        default_factory=list,
        description=(
            "Configures target package namespaces to scan, enabling dynamic model "
            "discovery across submodules."
        ),
    )
    auto_ignore_modules: list[str] = Field(
        default_factory=list,
        description=(
            "Configures submodule import paths to skip, mapping excluded modules "
            "during dynamic scanning."
        ),
    )

    # Schema Compilation Settings
    enable_schema_rebuilding: bool = Field(
        default=True,
        description=(
            "Enables dynamic rebuilding of validation schemas, triggering Pydantic "
            "model schema rebuilds."
        ),
    )
    schema_rebuild_parents: bool = Field(
        default=True,
        description=(
            "Enables parent propagation, mapping validation schema updates up the "
            "subclass MRO hierarchy."
        ),
    )

    # Introspection Settings
    info_exclude_keys: list[str] = Field(
        default_factory=lambda: ["info"],
        description=(
            "Maps specific fields to skip and exclude during InfoMixin logging."
        ),
    )

    @classmethod
    def settings_customise_sources(
        cls,
        settings_cls: type[BaseSettings],
        init_settings: PydanticBaseSettingsSource,
        env_settings: PydanticBaseSettingsSource,
        dotenv_settings: PydanticBaseSettingsSource,
        file_secret_settings: PydanticBaseSettingsSource,
    ) -> tuple[PydanticBaseSettingsSource, ...]:
        """Customize configuration loaders and define prioritize hierarchy.

        This method overrides the default Pydantic source loading priorities
        to determine the resolving order of configuration options.

        :param settings_cls: The settings class being constructed.
        :param init_settings: Settings passed directly to the class constructor.
        :param env_settings: Settings loaded from system environment variables.
        :param dotenv_settings: Settings loaded from active .env file streams.
        :param file_secret_settings: Settings loaded from secrets files paths.
        :returns: A tuple of settings sources ordered by loading priority.
        """
        _ = (file_secret_settings,)  # Allow unused variable to satisfy lint/format
        input_args = init_settings()
        project_root = Path(input_args.get("project_root") or Path.cwd())

        return (
            init_settings,
            env_settings,
            dotenv_settings,
            PyprojectTomlConfigSettingsSource(
                settings_cls, toml_file=project_root / "pyproject.toml"
            ),
            CliSettingsSource(
                settings_cls,
                cli_ignore_unknown_args=True,
                cli_parse_args=True,
                cli_prefix=settings_cls.model_config.get("cli_prefix", ""),
            ),
        )

    def __str__(self) -> str:
        """Generate a concise string representation of the settings.

        :returns: A human-readable string summary of the configuration state.
        """
        return f"Settings(project_root={self.project_root!r})"

    def __repr__(self) -> str:
        """Generate a detailed string representation of the settings.

        :returns: A detailed debug string representation of the settings.
        """
        return f"Settings(project_root={self.project_root!r})"

model_config = SettingsConfigDict(arbitrary_types_allowed=True, extra='ignore', populate_by_name=True, validate_assignment=True, env_prefix='DISDANTIC__', cli_prefix='disdantic_', cli_parse_args=True, pyproject_toml_table_header=('tool', 'disdantic')) class-attribute

Configuration dictionary defining Pydantic Settings behavior options.

Controls environment variables parsing prefix, CLI integration arguments, pyproject.toml headers, and assignment validation rules.

__repr__()

Generate a detailed string representation of the settings.

:returns: A detailed debug string representation of the settings.

Source code in src/disdantic/settings.py
def __repr__(self) -> str:
    """Generate a detailed string representation of the settings.

    :returns: A detailed debug string representation of the settings.
    """
    return f"Settings(project_root={self.project_root!r})"

__str__()

Generate a concise string representation of the settings.

:returns: A human-readable string summary of the configuration state.

Source code in src/disdantic/settings.py
def __str__(self) -> str:
    """Generate a concise string representation of the settings.

    :returns: A human-readable string summary of the configuration state.
    """
    return f"Settings(project_root={self.project_root!r})"

settings_customise_sources(settings_cls, init_settings, env_settings, dotenv_settings, file_secret_settings) classmethod

Customize configuration loaders and define prioritize hierarchy.

This method overrides the default Pydantic source loading priorities to determine the resolving order of configuration options.

:param settings_cls: The settings class being constructed. :param init_settings: Settings passed directly to the class constructor. :param env_settings: Settings loaded from system environment variables. :param dotenv_settings: Settings loaded from active .env file streams. :param file_secret_settings: Settings loaded from secrets files paths. :returns: A tuple of settings sources ordered by loading priority.

Source code in src/disdantic/settings.py
@classmethod
def settings_customise_sources(
    cls,
    settings_cls: type[BaseSettings],
    init_settings: PydanticBaseSettingsSource,
    env_settings: PydanticBaseSettingsSource,
    dotenv_settings: PydanticBaseSettingsSource,
    file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
    """Customize configuration loaders and define prioritize hierarchy.

    This method overrides the default Pydantic source loading priorities
    to determine the resolving order of configuration options.

    :param settings_cls: The settings class being constructed.
    :param init_settings: Settings passed directly to the class constructor.
    :param env_settings: Settings loaded from system environment variables.
    :param dotenv_settings: Settings loaded from active .env file streams.
    :param file_secret_settings: Settings loaded from secrets files paths.
    :returns: A tuple of settings sources ordered by loading priority.
    """
    _ = (file_secret_settings,)  # Allow unused variable to satisfy lint/format
    input_args = init_settings()
    project_root = Path(input_args.get("project_root") or Path.cwd())

    return (
        init_settings,
        env_settings,
        dotenv_settings,
        PyprojectTomlConfigSettingsSource(
            settings_cls, toml_file=project_root / "pyproject.toml"
        ),
        CliSettingsSource(
            settings_cls,
            cli_ignore_unknown_args=True,
            cli_parse_args=True,
            cli_prefix=settings_cls.model_config.get("cli_prefix", ""),
        ),
    )

SingletonMeta

Bases: type

Thread-safe Singleton metaclass.

This metaclass intercepts class instantiation to enforce the singleton pattern, ensuring that subsequent calls to the class constructor return the same cached instance. It uses double-checked locking to guarantee thread safety during initialization without compromising read performance on subsequent retrievals.

Example

.. code-block:: python

from disdantic.singleton import SingletonMeta

class DatabaseConnection(metaclass=SingletonMeta):
    def __init__(self, connection_string: str) -> None:
        self.connection_string = connection_string

# Both variables reference the exact same instance
conn1 = DatabaseConnection("db://host1")
conn2 = DatabaseConnection("db://host2")
assert conn1 is conn2
Source code in src/disdantic/singleton.py
class SingletonMeta(type):
    """
    Thread-safe Singleton metaclass.

    This metaclass intercepts class instantiation to enforce the singleton
    pattern, ensuring that subsequent calls to the class constructor return
    the same cached instance. It uses double-checked locking to guarantee
    thread safety during initialization without compromising read performance
    on subsequent retrievals.

    Example:
        .. code-block:: python

            from disdantic.singleton import SingletonMeta

            class DatabaseConnection(metaclass=SingletonMeta):
                def __init__(self, connection_string: str) -> None:
                    self.connection_string = connection_string

            # Both variables reference the exact same instance
            conn1 = DatabaseConnection("db://host1")
            conn2 = DatabaseConnection("db://host2")
            assert conn1 is conn2
    """

    _instances: dict[type, Any] = {}
    _lock: threading.Lock = threading.Lock()

    @classmethod
    def clear_all_singletons(cls) -> None:
        """
        Wipes all cached singleton instances across the application workspace.

        This method is primarily designed for cleanup between test runs, ensuring
        that side effects from singleton states do not leak across test boundaries.
        """
        with cls._lock:
            cls._instances.clear()

    def __call__(cls, *args: Any, **kwargs: Any) -> Any:
        """
        Intercepts class instantiation to return the cached singleton instance.

        If the instance does not already exist, it is created using double-checked
        locking to handle initialization race conditions.

        :param args: Positional arguments passed to the class constructor.
        :param kwargs: Keyword arguments passed to the class constructor.
        :returns: The single, thread-safe cached instance of the class.
        """
        # Fast Path: Check if instance already exists without acquiring a lock
        if cls not in cls._instances:
            with cls._lock:
                # Slow Path: Double-check within the lock context to handle
                # race conditions.
                if cls not in cls._instances:
                    cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]

    def clear_instances(cls) -> None:
        """
        Evicts the active single instance of the class from runtime tracking.

        Calling this method allows a new instance of the class to be created on the
        next instantiation attempt.
        """
        with cls._lock:
            cls._instances.pop(cls, None)

__call__(*args, **kwargs)

Intercepts class instantiation to return the cached singleton instance.

If the instance does not already exist, it is created using double-checked locking to handle initialization race conditions.

:param args: Positional arguments passed to the class constructor. :param kwargs: Keyword arguments passed to the class constructor. :returns: The single, thread-safe cached instance of the class.

Source code in src/disdantic/singleton.py
def __call__(cls, *args: Any, **kwargs: Any) -> Any:
    """
    Intercepts class instantiation to return the cached singleton instance.

    If the instance does not already exist, it is created using double-checked
    locking to handle initialization race conditions.

    :param args: Positional arguments passed to the class constructor.
    :param kwargs: Keyword arguments passed to the class constructor.
    :returns: The single, thread-safe cached instance of the class.
    """
    # Fast Path: Check if instance already exists without acquiring a lock
    if cls not in cls._instances:
        with cls._lock:
            # Slow Path: Double-check within the lock context to handle
            # race conditions.
            if cls not in cls._instances:
                cls._instances[cls] = super().__call__(*args, **kwargs)
    return cls._instances[cls]

clear_all_singletons() classmethod

Wipes all cached singleton instances across the application workspace.

This method is primarily designed for cleanup between test runs, ensuring that side effects from singleton states do not leak across test boundaries.

Source code in src/disdantic/singleton.py
@classmethod
def clear_all_singletons(cls) -> None:
    """
    Wipes all cached singleton instances across the application workspace.

    This method is primarily designed for cleanup between test runs, ensuring
    that side effects from singleton states do not leak across test boundaries.
    """
    with cls._lock:
        cls._instances.clear()

clear_instances()

Evicts the active single instance of the class from runtime tracking.

Calling this method allows a new instance of the class to be created on the next instantiation attempt.

Source code in src/disdantic/singleton.py
def clear_instances(cls) -> None:
    """
    Evicts the active single instance of the class from runtime tracking.

    Calling this method allows a new instance of the class to be created on the
    next instantiation attempt.
    """
    with cls._lock:
        cls._instances.pop(cls, None)

configure_logger(settings=None, **default_overrides)

Configure the global Loguru logger based on settings.

This function initializes or updates the active logger handler. If no settings are provided, it loads settings from environment variables. It enables interception of standard library log statements and configures formatting, sinks, filtering, and queue options.

Example

.. code-block:: python

from disdantic.logging import LoggingSettings, configure_logger

configure_logger(
    settings=LoggingSettings(level="INFO"),
    clear_loggers=True
)

:param settings: Logging configurations, defaults to None (loads from environment). :param default_overrides: Parameter overrides merged into env settings if settings is None. :returns: None. :raises ImportError: Raised if OpenTelemetry formatting is enabled but the package is not installed.

Source code in src/disdantic/logging.py
def configure_logger(  # noqa: C901, PLR0912, PLR0915
    settings: LoggingSettings | None = None,
    **default_overrides: Any,
) -> None:
    """
    Configure the global Loguru logger based on settings.

    This function initializes or updates the active logger handler. If no
    settings are provided, it loads settings from environment variables.
    It enables interception of standard library log statements and configures
    formatting, sinks, filtering, and queue options.

    Example:
        .. code-block:: python

            from disdantic.logging import LoggingSettings, configure_logger

            configure_logger(
                settings=LoggingSettings(level="INFO"),
                clear_loggers=True
            )

    :param settings: Logging configurations, defaults to None (loads from environment).
    :param default_overrides: Parameter overrides merged into env settings if
                              settings is None.
    :returns: None.
    :raises ImportError: Raised if OpenTelemetry formatting is enabled but the
                         package is not installed.
    """
    if settings is None:
        env_settings = LoggingSettings()
        merged = default_overrides.copy()
        for field in env_settings.model_fields_set:
            merged[field] = getattr(env_settings, field)
        settings = LoggingSettings(**merged)

    if not settings.enabled:
        logger.disable("disdantic")
        intercept_standard_logging(False)
        return

    logger.enable("disdantic")
    intercept_standard_logging(True)

    if settings.clear_loggers:
        if hasattr(logger, "_mock_name") or type(logger).__name__ in (
            "MagicMock",
            "Mock",
        ):
            logger.remove()
        else:
            for handler_id, handler in list(cast("Any", logger)._core.handlers.items()):  # noqa: SLF001
                sink = getattr(handler, "_sink", None)
                handler_obj = getattr(sink, "_handler", None)
                if handler_obj and type(handler_obj).__name__ == "PropagateHandler":
                    continue
                with contextlib.suppress(ValueError):
                    logger.remove(handler_id)
        _state["handler_id"] = None
    elif isinstance(_state["handler_id"], int):
        with contextlib.suppress(ValueError):
            logger.remove(_state["handler_id"])
        _state["handler_id"] = None

    use_otel = settings.otel_formatting == "enable" or (
        settings.otel_formatting == "auto" and opentelemetry_trace is not None
    )
    if settings.otel_formatting == "enable" and opentelemetry_trace is None:
        raise ImportError(
            "OpenTelemetry is not installed but 'otel_formatting' was set to 'enable'."
        )

    if use_otel:
        sink_val = OtelSink(settings.sink)
        log_format = "{message}\n"
    else:
        sink_val = settings.sink
        log_format = settings.format

    filter_val = "disdantic" if settings.filter is True else settings.filter

    if isinstance(filter_val, str):
        final_filter: Any = filter_val
    elif isinstance(filter_val, (list, tuple)):
        prefixes = tuple(filter_val)

        def final_filter(record: dict[str, Any]) -> bool:
            return bool(record["name"] and record["name"].startswith(prefixes))

    else:
        final_filter = None if filter_val is False else filter_val

    # Resolve "auto" log level
    level_val = settings.level
    if level_val == "auto":
        level_val = "WARNING"

    _state["handler_id"] = logger.add(
        sink=cast("Any", sink_val),
        level=level_val,
        filter=cast("Any", final_filter),
        format=cast("Any", log_format),
        enqueue=settings.enqueue,
        **settings.kwargs,
    )

get_registry_schema(registry_class, *, format='json')

Generates the schema for the specified registry base class.

Example

.. code-block:: python

from disdantic.schema import get_registry_schema
from myapp.registry import MyRegistry

# Generate OpenAPI schema for the registry
schema = get_registry_schema(MyRegistry, format="openapi")

:param registry_class: The registry base class subclassing PydanticClassRegistryMixin. :param format: The output format, either 'json' or 'openapi'. :raises TypeError: If the registry_class is not a subclass of PydanticClassRegistryMixin. :returns: A dictionary representing the generated schema.

Source code in src/disdantic/schema.py
def get_registry_schema(
    registry_class: type[PydanticClassRegistryMixin],
    *,
    format: SchemaFormat = "json",  # noqa: A002
) -> dict[str, Any]:
    """Generates the schema for the specified registry base class.

    Example:
        .. code-block:: python

            from disdantic.schema import get_registry_schema
            from myapp.registry import MyRegistry

            # Generate OpenAPI schema for the registry
            schema = get_registry_schema(MyRegistry, format="openapi")

    :param registry_class: The registry base class subclassing
        PydanticClassRegistryMixin.
    :param format: The output format, either 'json' or 'openapi'.
    :raises TypeError: If the registry_class is not a subclass of
        PydanticClassRegistryMixin.
    :returns: A dictionary representing the generated schema.
    """
    if not isinstance(registry_class, type) or not issubclass(
        registry_class, PydanticClassRegistryMixin
    ):
        got_type = (
            registry_class if isinstance(registry_class, type) else type(registry_class)
        )
        raise TypeError(
            "Expected a subclass of PydanticClassRegistryMixin, "
            f"got {got_type.__name__}"
        )

    # Rebuild class and registered models
    registry_class.model_rebuild(force=True)

    if format == "openapi":
        schema = registry_class.model_json_schema(
            ref_template="#/components/schemas/{model}"
        )
        if "$defs" in schema:
            schema["components"] = {"schemas": schema.pop("$defs")}
    else:
        schema = registry_class.model_json_schema()

    return schema

get_settings()

Retrieve the global Settings singleton instance.

Uses double-checked locking to resolve initialization race conditions in multi-threaded applications.

Example

.. code-block:: python

from disdantic.settings import get_settings

settings = get_settings()
print(settings.project_root)

:returns: The global Settings singleton instance.

Source code in src/disdantic/settings.py
def get_settings() -> Settings:
    """Retrieve the global Settings singleton instance.

    Uses double-checked locking to resolve initialization race conditions
    in multi-threaded applications.

    Example:
        .. code-block:: python

            from disdantic.settings import get_settings

            settings = get_settings()
            print(settings.project_root)

    :returns: The global Settings singleton instance.
    """
    global _global_settings  # noqa: PLW0603
    if _global_settings is None:
        with _settings_lock:
            if _global_settings is None:
                _global_settings = Settings()
    return _global_settings

reset_settings()

Reset the global Settings singleton instance to None.

Forces a complete reload and re-validation of settings on the next invocation of get_settings().

Example

.. code-block:: python

from disdantic.settings import reset_settings

# Evicts active settings from memory
reset_settings()

:returns: None

Source code in src/disdantic/settings.py
def reset_settings() -> None:
    """Reset the global Settings singleton instance to None.

    Forces a complete reload and re-validation of settings on the next
    invocation of get_settings().

    Example:
        .. code-block:: python

            from disdantic.settings import reset_settings

            # Evicts active settings from memory
            reset_settings()

    :returns: None
    """
    global _global_settings  # noqa: PLW0603
    with _settings_lock:
        _global_settings = None

verify_registries(settings=None)

Scan configured packages, discover registries, and verify compilation.

Examples:

.. code-block:: python

from disdantic.diagnose import verify_registries

report = verify_registries()
if not report.is_healthy:
    print(f"Diagnostics failed. Errors: {report.import_errors}")

:param settings: Optional settings configuration instance to customize packages and ignore rules. :returns: The aggregated health diagnostics report.

Source code in src/disdantic/diagnose.py
def verify_registries(settings: Settings | None = None) -> DiagnosticsReport:
    """Scan configured packages, discover registries, and verify compilation.

    Examples:
        .. code-block:: python

            from disdantic.diagnose import verify_registries

            report = verify_registries()
            if not report.is_healthy:
                print(f"Diagnostics failed. Errors: {report.import_errors}")

    :param settings: Optional settings configuration instance to customize packages
        and ignore rules.
    :returns: The aggregated health diagnostics report.
    """
    if settings is not None:
        with disdantic.settings._settings_lock:  # noqa: SLF001
            disdantic.settings._global_settings = settings  # noqa: SLF001

    resolved_settings = disdantic.settings.get_settings()
    unique_packages = _resolve_packages_to_scan(resolved_settings)
    ignore_set = _resolve_ignore_set(resolved_settings)

    import_errors: list[str] = []
    _scan_packages(unique_packages, ignore_set, import_errors)

    valid_registries = _find_valid_registries()

    registries_diagnostics: list[RegistryDiagnostics] = []
    is_healthy = len(import_errors) == 0

    for registry_class in valid_registries:
        registry_diag = _diagnose_registry(registry_class, import_errors)
        registries_diagnostics.append(registry_diag)

        if issubclass(registry_class, PydanticClassRegistryMixin):
            try:
                registry_class.model_rebuild(force=True, raise_errors=True)
                registry_class.model_json_schema()
            except Exception:  # noqa: BLE001
                is_healthy = False

        if any(model.compilation_status == "error" for model in registry_diag.models):
            is_healthy = False

    if import_errors:
        is_healthy = False

    return DiagnosticsReport(
        is_healthy=is_healthy,
        scanned_packages=unique_packages,
        registries=registries_diagnostics,
        import_errors=import_errors,
    )