Coverage for src / rustarium / logging.py: 65%

77 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-14 22:20 +0000

1""" 

2Loguru-based logging configuration and environment settings for rustarium. 

3 

4This module provides a unified interface for configuring application-level logging 

5using loguru and Pydantic settings. It handles dynamic OpenTelemetry formatting, 

6across the codebase and build environments. 

7""" 

8 

9from __future__ import annotations 

10 

11import contextlib 

12import json 

13import sys 

14import traceback 

15from collections.abc import Callable 

16from typing import Any, ClassVar, Literal 

17 

18from loguru import logger 

19from pydantic import Field, field_validator 

20from pydantic_settings import BaseSettings, SettingsConfigDict 

21 

22from rustarium.compat import opentelemetry_trace 

23 

24__all__ = ["LoggingSettings", "configure_logger", "logger"] 

25 

26_HANDLER_ID: int | None = None 

27 

28 

29class LoggingSettings(BaseSettings): 

30 """ 

31 Settings model for configuring the loguru logging infrastructure. 

32 

33 This Pydantic model loads configuration from environment variables prefixed 

34 with ``RUSTARIUM__LOGGING__`` and provides typed fields for controlling 

35 log output, formatting, and OpenTelemetry integration. 

36 

37 Example: 

38 .. code-block:: python 

39 

40 from rustarium.logging import LoggingSettings, configure_logger 

41 

42 settings = LoggingSettings(enabled=True, level="DEBUG") 

43 configure_logger(settings) 

44 """ 

45 

46 enabled: bool = Field( 

47 default=False, 

48 description=( 

49 "Whether to enable rustarium loguru logging across the application." 

50 ), 

51 ) 

52 clear_loggers: bool = Field( 

53 default=False, 

54 description=( 

55 "If true, removes all existing loguru handlers before configuring new ones." 

56 ), 

57 ) 

58 sink: str | Any = Field( 

59 default=sys.stdout, 

60 description=( 

61 "The output sink for log messages. Can be an object or string " 

62 "alias ('stdout')." 

63 ), 

64 ) 

65 level: str = Field( 

66 default="INFO", 

67 description="The minimum severity level for emitted log messages.", 

68 ) 

69 otel_formatting: Literal["auto", "enable", "disable"] = Field( 

70 default="auto", 

71 description=( 

72 "Controls OpenTelemetry JSON formatting. 'auto' enables it if " 

73 "otel is installed." 

74 ), 

75 ) 

76 format: str | Callable[..., Any] | None = Field( 

77 default="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | " 

78 "<cyan>{name}</cyan>:<cyan>{function}</cyan> - <level>{message}</level>\n", 

79 description=( 

80 "The log format string or function to use when otel formatting is disabled." 

81 ), 

82 ) 

83 filter: Any = Field( 

84 default=True, 

85 description=( 

86 "Filters log records. Defaults to True to filter by the 'rustarium' prefix." 

87 ), 

88 ) 

89 enqueue: bool = Field( 

90 default=True, 

91 description="Whether to enable thread-safe asynchronous logging.", 

92 ) 

93 kwargs: dict[str, Any] = Field( 

94 default_factory=dict, 

95 description=( 

96 "Additional keyword arguments to pass directly to loguru's add() method." 

97 ), 

98 ) 

99 

100 model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict( 

101 env_prefix="RUSTARIUM__LOGGING__", 

102 env_nested_delimiter="__", 

103 ) 

104 """Pydantic configuration dict dictating environment variable prefixes.""" 

105 

106 @field_validator("sink", mode="before") 

107 @classmethod 

108 def _parse_sink(cls, value: Any) -> Any: 

109 # Convert string aliases for stdout/stderr to actual objects. 

110 if isinstance(value, str): 

111 mapping = { 

112 "stdout": sys.stdout, 

113 "sys.stdout": sys.stdout, 

114 "stderr": sys.stderr, 

115 "sys.stderr": sys.stderr, 

116 } 

117 return mapping.get(value.lower(), value) 

118 return value 

119 

120 

121def _otel_formatter(record: dict[str, Any]) -> str: 

122 # Format the log record as an OpenTelemetry compliant JSON string. 

123 trace_id = span_id = trace_flags = None 

124 

125 if opentelemetry_trace: 

126 span = opentelemetry_trace.get_current_span() 

127 context = span.get_span_context() 

128 if context.is_valid: 

129 trace_id = format(context.trace_id, "032x") 

130 span_id = format(context.span_id, "016x") 

131 trace_flags = format(context.trace_flags, "02x") 

132 

133 log_record = { 

134 "timestamp": record["time"].isoformat(), 

135 "severity_text": record["level"].name, 

136 "body": record["message"], 

137 "resource": {"service.name": "rustarium"}, 

138 "attributes": { 

139 "module": record["name"], 

140 "function": record["function"], 

141 "line": record["line"], 

142 **record["extra"], 

143 }, 

144 } 

145 

146 if record.get("exception"): 

147 exception = record["exception"] 

148 log_record["attributes"]["exception.type"] = exception.type.__name__ 

149 log_record["attributes"]["exception.message"] = str(exception.value) 

150 log_record["attributes"]["exception.stacktrace"] = "".join( 

151 traceback.format_exception( 

152 exception.type, exception.value, exception.traceback 

153 ) 

154 ) 

155 

156 if trace_id: 

157 log_record.update( 

158 { 

159 "trace_id": trace_id, 

160 "span_id": span_id, 

161 "trace_flags": trace_flags, 

162 } 

163 ) 

164 

165 # Escape braces so loguru doesn't interpret the JSON string as a format string 

166 return json.dumps(log_record).replace("{", "{{").replace("}", "}}") + "\n" 

167 

168 

169def configure_logger(settings: LoggingSettings | None = None) -> None: 

170 """ 

171 Initializes the loguru logger with the provided settings or from the environment. 

172 

173 This function configures the global loguru logger instance based on the provided 

174 ``LoggingSettings``. It handles enabling/disabling the logger, managing sinks, 

175 and injecting the appropriate formatter (including OpenTelemetry). 

176 

177 Example: 

178 .. code-block:: python 

179 

180 from rustarium.logging import configure_logger, LoggingSettings 

181 

182 configure_logger(LoggingSettings(level="DEBUG")) 

183 

184 :param settings: An optional instance of ``LoggingSettings``. If not provided, 

185 settings are automatically loaded from the environment. 

186 :return: None 

187 :raises ImportError: If OpenTelemetry formatting is explicitly enabled but the 

188 package is not installed. 

189 """ 

190 global _HANDLER_ID # noqa: PLW0603 

191 

192 settings = settings or LoggingSettings() 

193 

194 if not settings.enabled: 

195 logger.disable("rustarium") 

196 return 

197 

198 logger.enable("rustarium") 

199 

200 if settings.clear_loggers: 

201 logger.remove() 

202 _HANDLER_ID = None 

203 elif isinstance(_HANDLER_ID, int): 

204 with contextlib.suppress(ValueError): 

205 logger.remove(_HANDLER_ID) 

206 _HANDLER_ID = None 

207 

208 use_otel = settings.otel_formatting == "enable" or ( 

209 settings.otel_formatting == "auto" and opentelemetry_trace is not None 

210 ) 

211 if settings.otel_formatting == "enable" and opentelemetry_trace is None: 

212 raise ImportError( 

213 "OpenTelemetry is not installed but 'otel_formatting' was set to 'enable'." 

214 ) 

215 

216 log_format = _otel_formatter if use_otel else settings.format 

217 filter_val = "rustarium" if settings.filter is True else settings.filter 

218 

219 if isinstance(filter_val, (list, tuple)): 

220 prefixes = tuple(filter_val) 

221 

222 def final_filter(record: dict[str, Any]) -> bool: 

223 return bool(record["name"] and record["name"].startswith(prefixes)) 

224 

225 elif isinstance(filter_val, str): 

226 

227 def final_filter(record: dict[str, Any]) -> bool: 

228 return bool(record["name"] and record["name"].startswith(filter_val)) 

229 

230 else: 

231 final_filter = None if filter_val is False else filter_val # type: ignore[assignment] 

232 

233 _HANDLER_ID = logger.add( 

234 settings.sink, # type: ignore[arg-type] 

235 level=settings.level, 

236 filter=final_filter, # type: ignore[arg-type] 

237 format=log_format, # type: ignore[arg-type] 

238 enqueue=settings.enqueue, 

239 **settings.kwargs, 

240 )