1105 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1105 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""The runtime functions and state used by compiled templates."""
 | 
						|
import functools
 | 
						|
import sys
 | 
						|
import typing as t
 | 
						|
from collections import abc
 | 
						|
from itertools import chain
 | 
						|
 | 
						|
from markupsafe import escape  # noqa: F401
 | 
						|
from markupsafe import Markup
 | 
						|
from markupsafe import soft_str
 | 
						|
 | 
						|
from .async_utils import auto_aiter
 | 
						|
from .async_utils import auto_await  # noqa: F401
 | 
						|
from .exceptions import TemplateNotFound  # noqa: F401
 | 
						|
from .exceptions import TemplateRuntimeError  # noqa: F401
 | 
						|
from .exceptions import UndefinedError
 | 
						|
from .nodes import EvalContext
 | 
						|
from .utils import _PassArg
 | 
						|
from .utils import concat
 | 
						|
from .utils import internalcode
 | 
						|
from .utils import missing
 | 
						|
from .utils import Namespace  # noqa: F401
 | 
						|
from .utils import object_type_repr
 | 
						|
from .utils import pass_eval_context
 | 
						|
 | 
						|
V = t.TypeVar("V")
 | 
						|
F = t.TypeVar("F", bound=t.Callable[..., t.Any])
 | 
						|
 | 
						|
if t.TYPE_CHECKING:
 | 
						|
    import logging
 | 
						|
    import typing_extensions as te
 | 
						|
    from .environment import Environment
 | 
						|
 | 
						|
    class LoopRenderFunc(te.Protocol):
 | 
						|
        def __call__(
 | 
						|
            self,
 | 
						|
            reciter: t.Iterable[V],
 | 
						|
            loop_render_func: "LoopRenderFunc",
 | 
						|
            depth: int = 0,
 | 
						|
        ) -> str:
 | 
						|
            ...
 | 
						|
 | 
						|
 | 
						|
# these variables are exported to the template runtime
 | 
						|
exported = [
 | 
						|
    "LoopContext",
 | 
						|
    "TemplateReference",
 | 
						|
    "Macro",
 | 
						|
    "Markup",
 | 
						|
    "TemplateRuntimeError",
 | 
						|
    "missing",
 | 
						|
    "concat",
 | 
						|
    "escape",
 | 
						|
    "markup_join",
 | 
						|
    "str_join",
 | 
						|
    "identity",
 | 
						|
    "TemplateNotFound",
 | 
						|
    "Namespace",
 | 
						|
    "Undefined",
 | 
						|
    "internalcode",
 | 
						|
]
 | 
						|
async_exported = [
 | 
						|
    "AsyncLoopContext",
 | 
						|
    "auto_aiter",
 | 
						|
    "auto_await",
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
def identity(x: V) -> V:
 | 
						|
    """Returns its argument. Useful for certain things in the
 | 
						|
    environment.
 | 
						|
    """
 | 
						|
    return x
 | 
						|
 | 
						|
 | 
						|
def markup_join(seq: t.Iterable[t.Any]) -> str:
 | 
						|
    """Concatenation that escapes if necessary and converts to string."""
 | 
						|
    buf = []
 | 
						|
    iterator = map(soft_str, seq)
 | 
						|
    for arg in iterator:
 | 
						|
        buf.append(arg)
 | 
						|
        if hasattr(arg, "__html__"):
 | 
						|
            return Markup("").join(chain(buf, iterator))
 | 
						|
    return concat(buf)
 | 
						|
 | 
						|
 | 
						|
def str_join(seq: t.Iterable[t.Any]) -> str:
 | 
						|
    """Simple args to string conversion and concatenation."""
 | 
						|
    return concat(map(str, seq))
 | 
						|
 | 
						|
 | 
						|
def unicode_join(seq: t.Iterable[t.Any]) -> str:
 | 
						|
    import warnings
 | 
						|
 | 
						|
    warnings.warn(
 | 
						|
        "This template must be recompiled with at least Jinja 3.0, or"
 | 
						|
        " it will fail in Jinja 3.1.",
 | 
						|
        DeprecationWarning,
 | 
						|
        stacklevel=2,
 | 
						|
    )
 | 
						|
    return str_join(seq)
 | 
						|
 | 
						|
 | 
						|
def new_context(
 | 
						|
    environment: "Environment",
 | 
						|
    template_name: t.Optional[str],
 | 
						|
    blocks: t.Dict[str, t.Callable[["Context"], t.Iterator[str]]],
 | 
						|
    vars: t.Optional[t.Dict[str, t.Any]] = None,
 | 
						|
    shared: bool = False,
 | 
						|
    globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
 | 
						|
    locals: t.Optional[t.Mapping[str, t.Any]] = None,
 | 
						|
) -> "Context":
 | 
						|
    """Internal helper for context creation."""
 | 
						|
    if vars is None:
 | 
						|
        vars = {}
 | 
						|
    if shared:
 | 
						|
        parent = vars
 | 
						|
    else:
 | 
						|
        parent = dict(globals or (), **vars)
 | 
						|
    if locals:
 | 
						|
        # if the parent is shared a copy should be created because
 | 
						|
        # we don't want to modify the dict passed
 | 
						|
        if shared:
 | 
						|
            parent = dict(parent)
 | 
						|
        for key, value in locals.items():
 | 
						|
            if value is not missing:
 | 
						|
                parent[key] = value
 | 
						|
    return environment.context_class(
 | 
						|
        environment, parent, template_name, blocks, globals=globals
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
class TemplateReference:
 | 
						|
    """The `self` in templates."""
 | 
						|
 | 
						|
    def __init__(self, context: "Context") -> None:
 | 
						|
        self.__context = context
 | 
						|
 | 
						|
    def __getitem__(self, name: str) -> t.Any:
 | 
						|
        blocks = self.__context.blocks[name]
 | 
						|
        return BlockReference(name, self.__context, blocks, 0)
 | 
						|
 | 
						|
    def __repr__(self) -> str:
 | 
						|
        return f"<{type(self).__name__} {self.__context.name!r}>"
 | 
						|
 | 
						|
 | 
						|
def _dict_method_all(dict_method: F) -> F:
 | 
						|
    @functools.wraps(dict_method)
 | 
						|
    def f_all(self: "Context") -> t.Any:
 | 
						|
        return dict_method(self.get_all())
 | 
						|
 | 
						|
    return t.cast(F, f_all)
 | 
						|
 | 
						|
 | 
						|
@abc.Mapping.register
 | 
						|
class Context:
 | 
						|
    """The template context holds the variables of a template.  It stores the
 | 
						|
    values passed to the template and also the names the template exports.
 | 
						|
    Creating instances is neither supported nor useful as it's created
 | 
						|
    automatically at various stages of the template evaluation and should not
 | 
						|
    be created by hand.
 | 
						|
 | 
						|
    The context is immutable.  Modifications on :attr:`parent` **must not**
 | 
						|
    happen and modifications on :attr:`vars` are allowed from generated
 | 
						|
    template code only.  Template filters and global functions marked as
 | 
						|
    :func:`pass_context` get the active context passed as first argument
 | 
						|
    and are allowed to access the context read-only.
 | 
						|
 | 
						|
    The template context supports read only dict operations (`get`,
 | 
						|
    `keys`, `values`, `items`, `iterkeys`, `itervalues`, `iteritems`,
 | 
						|
    `__getitem__`, `__contains__`).  Additionally there is a :meth:`resolve`
 | 
						|
    method that doesn't fail with a `KeyError` but returns an
 | 
						|
    :class:`Undefined` object for missing variables.
 | 
						|
    """
 | 
						|
 | 
						|
    _legacy_resolve_mode: t.ClassVar[bool] = False
 | 
						|
 | 
						|
    def __init_subclass__(cls) -> None:
 | 
						|
        if "resolve_or_missing" in cls.__dict__:
 | 
						|
            # If the subclass overrides resolve_or_missing it opts in to
 | 
						|
            # modern mode no matter what.
 | 
						|
            cls._legacy_resolve_mode = False
 | 
						|
        elif "resolve" in cls.__dict__ or cls._legacy_resolve_mode:
 | 
						|
            # If the subclass overrides resolve, or if its base is
 | 
						|
            # already in legacy mode, warn about legacy behavior.
 | 
						|
            import warnings
 | 
						|
 | 
						|
            warnings.warn(
 | 
						|
                "Overriding 'resolve' is deprecated and will not have"
 | 
						|
                " the expected behavior in Jinja 3.1. Override"
 | 
						|
                " 'resolve_or_missing' instead ",
 | 
						|
                DeprecationWarning,
 | 
						|
                stacklevel=2,
 | 
						|
            )
 | 
						|
            cls._legacy_resolve_mode = True
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        environment: "Environment",
 | 
						|
        parent: t.Dict[str, t.Any],
 | 
						|
        name: t.Optional[str],
 | 
						|
        blocks: t.Dict[str, t.Callable[["Context"], t.Iterator[str]]],
 | 
						|
        globals: t.Optional[t.MutableMapping[str, t.Any]] = None,
 | 
						|
    ):
 | 
						|
        self.parent = parent
 | 
						|
        self.vars: t.Dict[str, t.Any] = {}
 | 
						|
        self.environment: "Environment" = environment
 | 
						|
        self.eval_ctx = EvalContext(self.environment, name)
 | 
						|
        self.exported_vars: t.Set[str] = set()
 | 
						|
        self.name = name
 | 
						|
        self.globals_keys = set() if globals is None else set(globals)
 | 
						|
 | 
						|
        # create the initial mapping of blocks.  Whenever template inheritance
 | 
						|
        # takes place the runtime will update this mapping with the new blocks
 | 
						|
        # from the template.
 | 
						|
        self.blocks = {k: [v] for k, v in blocks.items()}
 | 
						|
 | 
						|
    def super(
 | 
						|
        self, name: str, current: t.Callable[["Context"], t.Iterator[str]]
 | 
						|
    ) -> t.Union["BlockReference", "Undefined"]:
 | 
						|
        """Render a parent block."""
 | 
						|
        try:
 | 
						|
            blocks = self.blocks[name]
 | 
						|
            index = blocks.index(current) + 1
 | 
						|
            blocks[index]
 | 
						|
        except LookupError:
 | 
						|
            return self.environment.undefined(
 | 
						|
                f"there is no parent block called {name!r}.", name="super"
 | 
						|
            )
 | 
						|
        return BlockReference(name, self, blocks, index)
 | 
						|
 | 
						|
    def get(self, key: str, default: t.Any = None) -> t.Any:
 | 
						|
        """Look up a variable by name, or return a default if the key is
 | 
						|
        not found.
 | 
						|
 | 
						|
        :param key: The variable name to look up.
 | 
						|
        :param default: The value to return if the key is not found.
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            return self[key]
 | 
						|
        except KeyError:
 | 
						|
            return default
 | 
						|
 | 
						|
    def resolve(self, key: str) -> t.Union[t.Any, "Undefined"]:
 | 
						|
        """Look up a variable by name, or return an :class:`Undefined`
 | 
						|
        object if the key is not found.
 | 
						|
 | 
						|
        If you need to add custom behavior, override
 | 
						|
        :meth:`resolve_or_missing`, not this method. The various lookup
 | 
						|
        functions use that method, not this one.
 | 
						|
 | 
						|
        :param key: The variable name to look up.
 | 
						|
        """
 | 
						|
        if self._legacy_resolve_mode:
 | 
						|
            if key in self.vars:
 | 
						|
                return self.vars[key]
 | 
						|
 | 
						|
            if key in self.parent:
 | 
						|
                return self.parent[key]
 | 
						|
 | 
						|
            return self.environment.undefined(name=key)
 | 
						|
 | 
						|
        rv = self.resolve_or_missing(key)
 | 
						|
 | 
						|
        if rv is missing:
 | 
						|
            return self.environment.undefined(name=key)
 | 
						|
 | 
						|
        return rv
 | 
						|
 | 
						|
    def resolve_or_missing(self, key: str) -> t.Any:
 | 
						|
        """Look up a variable by name, or return a ``missing`` sentinel
 | 
						|
        if the key is not found.
 | 
						|
 | 
						|
        Override this method to add custom lookup behavior.
 | 
						|
        :meth:`resolve`, :meth:`get`, and :meth:`__getitem__` use this
 | 
						|
        method. Don't call this method directly.
 | 
						|
 | 
						|
        :param key: The variable name to look up.
 | 
						|
        """
 | 
						|
        if self._legacy_resolve_mode:
 | 
						|
            rv = self.resolve(key)
 | 
						|
 | 
						|
            if isinstance(rv, Undefined):
 | 
						|
                return missing
 | 
						|
 | 
						|
            return rv
 | 
						|
 | 
						|
        if key in self.vars:
 | 
						|
            return self.vars[key]
 | 
						|
 | 
						|
        if key in self.parent:
 | 
						|
            return self.parent[key]
 | 
						|
 | 
						|
        return missing
 | 
						|
 | 
						|
    def get_exported(self) -> t.Dict[str, t.Any]:
 | 
						|
        """Get a new dict with the exported variables."""
 | 
						|
        return {k: self.vars[k] for k in self.exported_vars}
 | 
						|
 | 
						|
    def get_all(self) -> t.Dict[str, t.Any]:
 | 
						|
        """Return the complete context as dict including the exported
 | 
						|
        variables.  For optimizations reasons this might not return an
 | 
						|
        actual copy so be careful with using it.
 | 
						|
        """
 | 
						|
        if not self.vars:
 | 
						|
            return self.parent
 | 
						|
        if not self.parent:
 | 
						|
            return self.vars
 | 
						|
        return dict(self.parent, **self.vars)
 | 
						|
 | 
						|
    @internalcode
 | 
						|
    def call(
 | 
						|
        __self, __obj: t.Callable, *args: t.Any, **kwargs: t.Any  # noqa: B902
 | 
						|
    ) -> t.Union[t.Any, "Undefined"]:
 | 
						|
        """Call the callable with the arguments and keyword arguments
 | 
						|
        provided but inject the active context or environment as first
 | 
						|
        argument if the callable has :func:`pass_context` or
 | 
						|
        :func:`pass_environment`.
 | 
						|
        """
 | 
						|
        if __debug__:
 | 
						|
            __traceback_hide__ = True  # noqa
 | 
						|
 | 
						|
        # Allow callable classes to take a context
 | 
						|
        if (
 | 
						|
            hasattr(__obj, "__call__")  # noqa: B004
 | 
						|
            and _PassArg.from_obj(__obj.__call__) is not None  # type: ignore
 | 
						|
        ):
 | 
						|
            __obj = __obj.__call__  # type: ignore
 | 
						|
 | 
						|
        pass_arg = _PassArg.from_obj(__obj)
 | 
						|
 | 
						|
        if pass_arg is _PassArg.context:
 | 
						|
            # the active context should have access to variables set in
 | 
						|
            # loops and blocks without mutating the context itself
 | 
						|
            if kwargs.get("_loop_vars"):
 | 
						|
                __self = __self.derived(kwargs["_loop_vars"])
 | 
						|
            if kwargs.get("_block_vars"):
 | 
						|
                __self = __self.derived(kwargs["_block_vars"])
 | 
						|
            args = (__self,) + args
 | 
						|
        elif pass_arg is _PassArg.eval_context:
 | 
						|
            args = (__self.eval_ctx,) + args
 | 
						|
        elif pass_arg is _PassArg.environment:
 | 
						|
            args = (__self.environment,) + args
 | 
						|
 | 
						|
        kwargs.pop("_block_vars", None)
 | 
						|
        kwargs.pop("_loop_vars", None)
 | 
						|
 | 
						|
        try:
 | 
						|
            return __obj(*args, **kwargs)
 | 
						|
        except StopIteration:
 | 
						|
            return __self.environment.undefined(
 | 
						|
                "value was undefined because a callable raised a"
 | 
						|
                " StopIteration exception"
 | 
						|
            )
 | 
						|
 | 
						|
    def derived(self, locals: t.Optional[t.Dict[str, t.Any]] = None) -> "Context":
 | 
						|
        """Internal helper function to create a derived context.  This is
 | 
						|
        used in situations where the system needs a new context in the same
 | 
						|
        template that is independent.
 | 
						|
        """
 | 
						|
        context = new_context(
 | 
						|
            self.environment, self.name, {}, self.get_all(), True, None, locals
 | 
						|
        )
 | 
						|
        context.eval_ctx = self.eval_ctx
 | 
						|
        context.blocks.update((k, list(v)) for k, v in self.blocks.items())
 | 
						|
        return context
 | 
						|
 | 
						|
    keys = _dict_method_all(dict.keys)
 | 
						|
    values = _dict_method_all(dict.values)
 | 
						|
    items = _dict_method_all(dict.items)
 | 
						|
 | 
						|
    def __contains__(self, name: str) -> bool:
 | 
						|
        return name in self.vars or name in self.parent
 | 
						|
 | 
						|
    def __getitem__(self, key: str) -> t.Any:
 | 
						|
        """Look up a variable by name with ``[]`` syntax, or raise a
 | 
						|
        ``KeyError`` if the key is not found.
 | 
						|
        """
 | 
						|
        item = self.resolve_or_missing(key)
 | 
						|
 | 
						|
        if item is missing:
 | 
						|
            raise KeyError(key)
 | 
						|
 | 
						|
        return item
 | 
						|
 | 
						|
    def __repr__(self) -> str:
 | 
						|
        return f"<{type(self).__name__} {self.get_all()!r} of {self.name!r}>"
 | 
						|
 | 
						|
 | 
						|
class BlockReference:
 | 
						|
    """One block on a template reference."""
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        name: str,
 | 
						|
        context: "Context",
 | 
						|
        stack: t.List[t.Callable[["Context"], t.Iterator[str]]],
 | 
						|
        depth: int,
 | 
						|
    ) -> None:
 | 
						|
        self.name = name
 | 
						|
        self._context = context
 | 
						|
        self._stack = stack
 | 
						|
        self._depth = depth
 | 
						|
 | 
						|
    @property
 | 
						|
    def super(self) -> t.Union["BlockReference", "Undefined"]:
 | 
						|
        """Super the block."""
 | 
						|
        if self._depth + 1 >= len(self._stack):
 | 
						|
            return self._context.environment.undefined(
 | 
						|
                f"there is no parent block called {self.name!r}.", name="super"
 | 
						|
            )
 | 
						|
        return BlockReference(self.name, self._context, self._stack, self._depth + 1)
 | 
						|
 | 
						|
    @internalcode
 | 
						|
    async def _async_call(self) -> str:
 | 
						|
        rv = concat(
 | 
						|
            [x async for x in self._stack[self._depth](self._context)]  # type: ignore
 | 
						|
        )
 | 
						|
 | 
						|
        if self._context.eval_ctx.autoescape:
 | 
						|
            return Markup(rv)
 | 
						|
 | 
						|
        return rv
 | 
						|
 | 
						|
    @internalcode
 | 
						|
    def __call__(self) -> str:
 | 
						|
        if self._context.environment.is_async:
 | 
						|
            return self._async_call()  # type: ignore
 | 
						|
 | 
						|
        rv = concat(self._stack[self._depth](self._context))
 | 
						|
 | 
						|
        if self._context.eval_ctx.autoescape:
 | 
						|
            return Markup(rv)
 | 
						|
 | 
						|
        return rv
 | 
						|
 | 
						|
 | 
						|
class LoopContext:
 | 
						|
    """A wrapper iterable for dynamic ``for`` loops, with information
 | 
						|
    about the loop and iteration.
 | 
						|
    """
 | 
						|
 | 
						|
    #: Current iteration of the loop, starting at 0.
 | 
						|
    index0 = -1
 | 
						|
 | 
						|
    _length: t.Optional[int] = None
 | 
						|
    _after: t.Any = missing
 | 
						|
    _current: t.Any = missing
 | 
						|
    _before: t.Any = missing
 | 
						|
    _last_changed_value: t.Any = missing
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        iterable: t.Iterable[V],
 | 
						|
        undefined: t.Type["Undefined"],
 | 
						|
        recurse: t.Optional["LoopRenderFunc"] = None,
 | 
						|
        depth0: int = 0,
 | 
						|
    ) -> None:
 | 
						|
        """
 | 
						|
        :param iterable: Iterable to wrap.
 | 
						|
        :param undefined: :class:`Undefined` class to use for next and
 | 
						|
            previous items.
 | 
						|
        :param recurse: The function to render the loop body when the
 | 
						|
            loop is marked recursive.
 | 
						|
        :param depth0: Incremented when looping recursively.
 | 
						|
        """
 | 
						|
        self._iterable = iterable
 | 
						|
        self._iterator = self._to_iterator(iterable)
 | 
						|
        self._undefined = undefined
 | 
						|
        self._recurse = recurse
 | 
						|
        #: How many levels deep a recursive loop currently is, starting at 0.
 | 
						|
        self.depth0 = depth0
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _to_iterator(iterable: t.Iterable[V]) -> t.Iterator[V]:
 | 
						|
        return iter(iterable)
 | 
						|
 | 
						|
    @property
 | 
						|
    def length(self) -> int:
 | 
						|
        """Length of the iterable.
 | 
						|
 | 
						|
        If the iterable is a generator or otherwise does not have a
 | 
						|
        size, it is eagerly evaluated to get a size.
 | 
						|
        """
 | 
						|
        if self._length is not None:
 | 
						|
            return self._length
 | 
						|
 | 
						|
        try:
 | 
						|
            self._length = len(self._iterable)  # type: ignore
 | 
						|
        except TypeError:
 | 
						|
            iterable = list(self._iterator)
 | 
						|
            self._iterator = self._to_iterator(iterable)
 | 
						|
            self._length = len(iterable) + self.index + (self._after is not missing)
 | 
						|
 | 
						|
        return self._length
 | 
						|
 | 
						|
    def __len__(self) -> int:
 | 
						|
        return self.length
 | 
						|
 | 
						|
    @property
 | 
						|
    def depth(self) -> int:
 | 
						|
        """How many levels deep a recursive loop currently is, starting at 1."""
 | 
						|
        return self.depth0 + 1
 | 
						|
 | 
						|
    @property
 | 
						|
    def index(self) -> int:
 | 
						|
        """Current iteration of the loop, starting at 1."""
 | 
						|
        return self.index0 + 1
 | 
						|
 | 
						|
    @property
 | 
						|
    def revindex0(self) -> int:
 | 
						|
        """Number of iterations from the end of the loop, ending at 0.
 | 
						|
 | 
						|
        Requires calculating :attr:`length`.
 | 
						|
        """
 | 
						|
        return self.length - self.index
 | 
						|
 | 
						|
    @property
 | 
						|
    def revindex(self) -> int:
 | 
						|
        """Number of iterations from the end of the loop, ending at 1.
 | 
						|
 | 
						|
        Requires calculating :attr:`length`.
 | 
						|
        """
 | 
						|
        return self.length - self.index0
 | 
						|
 | 
						|
    @property
 | 
						|
    def first(self) -> bool:
 | 
						|
        """Whether this is the first iteration of the loop."""
 | 
						|
        return self.index0 == 0
 | 
						|
 | 
						|
    def _peek_next(self) -> t.Any:
 | 
						|
        """Return the next element in the iterable, or :data:`missing`
 | 
						|
        if the iterable is exhausted. Only peeks one item ahead, caching
 | 
						|
        the result in :attr:`_last` for use in subsequent checks. The
 | 
						|
        cache is reset when :meth:`__next__` is called.
 | 
						|
        """
 | 
						|
        if self._after is not missing:
 | 
						|
            return self._after
 | 
						|
 | 
						|
        self._after = next(self._iterator, missing)
 | 
						|
        return self._after
 | 
						|
 | 
						|
    @property
 | 
						|
    def last(self) -> bool:
 | 
						|
        """Whether this is the last iteration of the loop.
 | 
						|
 | 
						|
        Causes the iterable to advance early. See
 | 
						|
        :func:`itertools.groupby` for issues this can cause.
 | 
						|
        The :func:`groupby` filter avoids that issue.
 | 
						|
        """
 | 
						|
        return self._peek_next() is missing
 | 
						|
 | 
						|
    @property
 | 
						|
    def previtem(self) -> t.Union[t.Any, "Undefined"]:
 | 
						|
        """The item in the previous iteration. Undefined during the
 | 
						|
        first iteration.
 | 
						|
        """
 | 
						|
        if self.first:
 | 
						|
            return self._undefined("there is no previous item")
 | 
						|
 | 
						|
        return self._before
 | 
						|
 | 
						|
    @property
 | 
						|
    def nextitem(self) -> t.Union[t.Any, "Undefined"]:
 | 
						|
        """The item in the next iteration. Undefined during the last
 | 
						|
        iteration.
 | 
						|
 | 
						|
        Causes the iterable to advance early. See
 | 
						|
        :func:`itertools.groupby` for issues this can cause.
 | 
						|
        The :func:`jinja-filters.groupby` filter avoids that issue.
 | 
						|
        """
 | 
						|
        rv = self._peek_next()
 | 
						|
 | 
						|
        if rv is missing:
 | 
						|
            return self._undefined("there is no next item")
 | 
						|
 | 
						|
        return rv
 | 
						|
 | 
						|
    def cycle(self, *args: V) -> V:
 | 
						|
        """Return a value from the given args, cycling through based on
 | 
						|
        the current :attr:`index0`.
 | 
						|
 | 
						|
        :param args: One or more values to cycle through.
 | 
						|
        """
 | 
						|
        if not args:
 | 
						|
            raise TypeError("no items for cycling given")
 | 
						|
 | 
						|
        return args[self.index0 % len(args)]
 | 
						|
 | 
						|
    def changed(self, *value: t.Any) -> bool:
 | 
						|
        """Return ``True`` if previously called with a different value
 | 
						|
        (including when called for the first time).
 | 
						|
 | 
						|
        :param value: One or more values to compare to the last call.
 | 
						|
        """
 | 
						|
        if self._last_changed_value != value:
 | 
						|
            self._last_changed_value = value
 | 
						|
            return True
 | 
						|
 | 
						|
        return False
 | 
						|
 | 
						|
    def __iter__(self) -> "LoopContext":
 | 
						|
        return self
 | 
						|
 | 
						|
    def __next__(self) -> t.Tuple[t.Any, "LoopContext"]:
 | 
						|
        if self._after is not missing:
 | 
						|
            rv = self._after
 | 
						|
            self._after = missing
 | 
						|
        else:
 | 
						|
            rv = next(self._iterator)
 | 
						|
 | 
						|
        self.index0 += 1
 | 
						|
        self._before = self._current
 | 
						|
        self._current = rv
 | 
						|
        return rv, self
 | 
						|
 | 
						|
    @internalcode
 | 
						|
    def __call__(self, iterable: t.Iterable[V]) -> str:
 | 
						|
        """When iterating over nested data, render the body of the loop
 | 
						|
        recursively with the given inner iterable data.
 | 
						|
 | 
						|
        The loop must have the ``recursive`` marker for this to work.
 | 
						|
        """
 | 
						|
        if self._recurse is None:
 | 
						|
            raise TypeError(
 | 
						|
                "The loop must have the 'recursive' marker to be called recursively."
 | 
						|
            )
 | 
						|
 | 
						|
        return self._recurse(iterable, self._recurse, depth=self.depth)
 | 
						|
 | 
						|
    def __repr__(self) -> str:
 | 
						|
        return f"<{type(self).__name__} {self.index}/{self.length}>"
 | 
						|
 | 
						|
 | 
						|
class AsyncLoopContext(LoopContext):
 | 
						|
    _iterator: t.AsyncIterator[t.Any]  # type: ignore
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _to_iterator(  # type: ignore
 | 
						|
        iterable: t.Union[t.Iterable[V], t.AsyncIterable[V]]
 | 
						|
    ) -> t.AsyncIterator[V]:
 | 
						|
        return auto_aiter(iterable)
 | 
						|
 | 
						|
    @property
 | 
						|
    async def length(self) -> int:  # type: ignore
 | 
						|
        if self._length is not None:
 | 
						|
            return self._length
 | 
						|
 | 
						|
        try:
 | 
						|
            self._length = len(self._iterable)  # type: ignore
 | 
						|
        except TypeError:
 | 
						|
            iterable = [x async for x in self._iterator]
 | 
						|
            self._iterator = self._to_iterator(iterable)
 | 
						|
            self._length = len(iterable) + self.index + (self._after is not missing)
 | 
						|
 | 
						|
        return self._length
 | 
						|
 | 
						|
    @property
 | 
						|
    async def revindex0(self) -> int:  # type: ignore
 | 
						|
        return await self.length - self.index
 | 
						|
 | 
						|
    @property
 | 
						|
    async def revindex(self) -> int:  # type: ignore
 | 
						|
        return await self.length - self.index0
 | 
						|
 | 
						|
    async def _peek_next(self) -> t.Any:
 | 
						|
        if self._after is not missing:
 | 
						|
            return self._after
 | 
						|
 | 
						|
        try:
 | 
						|
            self._after = await self._iterator.__anext__()
 | 
						|
        except StopAsyncIteration:
 | 
						|
            self._after = missing
 | 
						|
 | 
						|
        return self._after
 | 
						|
 | 
						|
    @property
 | 
						|
    async def last(self) -> bool:  # type: ignore
 | 
						|
        return await self._peek_next() is missing
 | 
						|
 | 
						|
    @property
 | 
						|
    async def nextitem(self) -> t.Union[t.Any, "Undefined"]:
 | 
						|
        rv = await self._peek_next()
 | 
						|
 | 
						|
        if rv is missing:
 | 
						|
            return self._undefined("there is no next item")
 | 
						|
 | 
						|
        return rv
 | 
						|
 | 
						|
    def __aiter__(self) -> "AsyncLoopContext":
 | 
						|
        return self
 | 
						|
 | 
						|
    async def __anext__(self) -> t.Tuple[t.Any, "AsyncLoopContext"]:
 | 
						|
        if self._after is not missing:
 | 
						|
            rv = self._after
 | 
						|
            self._after = missing
 | 
						|
        else:
 | 
						|
            rv = await self._iterator.__anext__()
 | 
						|
 | 
						|
        self.index0 += 1
 | 
						|
        self._before = self._current
 | 
						|
        self._current = rv
 | 
						|
        return rv, self
 | 
						|
 | 
						|
 | 
						|
class Macro:
 | 
						|
    """Wraps a macro function."""
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        environment: "Environment",
 | 
						|
        func: t.Callable[..., str],
 | 
						|
        name: str,
 | 
						|
        arguments: t.List[str],
 | 
						|
        catch_kwargs: bool,
 | 
						|
        catch_varargs: bool,
 | 
						|
        caller: bool,
 | 
						|
        default_autoescape: t.Optional[bool] = None,
 | 
						|
    ):
 | 
						|
        self._environment = environment
 | 
						|
        self._func = func
 | 
						|
        self._argument_count = len(arguments)
 | 
						|
        self.name = name
 | 
						|
        self.arguments = arguments
 | 
						|
        self.catch_kwargs = catch_kwargs
 | 
						|
        self.catch_varargs = catch_varargs
 | 
						|
        self.caller = caller
 | 
						|
        self.explicit_caller = "caller" in arguments
 | 
						|
 | 
						|
        if default_autoescape is None:
 | 
						|
            if callable(environment.autoescape):
 | 
						|
                default_autoescape = environment.autoescape(None)
 | 
						|
            else:
 | 
						|
                default_autoescape = environment.autoescape
 | 
						|
 | 
						|
        self._default_autoescape = default_autoescape
 | 
						|
 | 
						|
    @internalcode
 | 
						|
    @pass_eval_context
 | 
						|
    def __call__(self, *args: t.Any, **kwargs: t.Any) -> str:
 | 
						|
        # This requires a bit of explanation,  In the past we used to
 | 
						|
        # decide largely based on compile-time information if a macro is
 | 
						|
        # safe or unsafe.  While there was a volatile mode it was largely
 | 
						|
        # unused for deciding on escaping.  This turns out to be
 | 
						|
        # problematic for macros because whether a macro is safe depends not
 | 
						|
        # on the escape mode when it was defined, but rather when it was used.
 | 
						|
        #
 | 
						|
        # Because however we export macros from the module system and
 | 
						|
        # there are historic callers that do not pass an eval context (and
 | 
						|
        # will continue to not pass one), we need to perform an instance
 | 
						|
        # check here.
 | 
						|
        #
 | 
						|
        # This is considered safe because an eval context is not a valid
 | 
						|
        # argument to callables otherwise anyway.  Worst case here is
 | 
						|
        # that if no eval context is passed we fall back to the compile
 | 
						|
        # time autoescape flag.
 | 
						|
        if args and isinstance(args[0], EvalContext):
 | 
						|
            autoescape = args[0].autoescape
 | 
						|
            args = args[1:]
 | 
						|
        else:
 | 
						|
            autoescape = self._default_autoescape
 | 
						|
 | 
						|
        # try to consume the positional arguments
 | 
						|
        arguments = list(args[: self._argument_count])
 | 
						|
        off = len(arguments)
 | 
						|
 | 
						|
        # For information why this is necessary refer to the handling
 | 
						|
        # of caller in the `macro_body` handler in the compiler.
 | 
						|
        found_caller = False
 | 
						|
 | 
						|
        # if the number of arguments consumed is not the number of
 | 
						|
        # arguments expected we start filling in keyword arguments
 | 
						|
        # and defaults.
 | 
						|
        if off != self._argument_count:
 | 
						|
            for name in self.arguments[len(arguments) :]:
 | 
						|
                try:
 | 
						|
                    value = kwargs.pop(name)
 | 
						|
                except KeyError:
 | 
						|
                    value = missing
 | 
						|
                if name == "caller":
 | 
						|
                    found_caller = True
 | 
						|
                arguments.append(value)
 | 
						|
        else:
 | 
						|
            found_caller = self.explicit_caller
 | 
						|
 | 
						|
        # it's important that the order of these arguments does not change
 | 
						|
        # if not also changed in the compiler's `function_scoping` method.
 | 
						|
        # the order is caller, keyword arguments, positional arguments!
 | 
						|
        if self.caller and not found_caller:
 | 
						|
            caller = kwargs.pop("caller", None)
 | 
						|
            if caller is None:
 | 
						|
                caller = self._environment.undefined("No caller defined", name="caller")
 | 
						|
            arguments.append(caller)
 | 
						|
 | 
						|
        if self.catch_kwargs:
 | 
						|
            arguments.append(kwargs)
 | 
						|
        elif kwargs:
 | 
						|
            if "caller" in kwargs:
 | 
						|
                raise TypeError(
 | 
						|
                    f"macro {self.name!r} was invoked with two values for the special"
 | 
						|
                    " caller argument. This is most likely a bug."
 | 
						|
                )
 | 
						|
            raise TypeError(
 | 
						|
                f"macro {self.name!r} takes no keyword argument {next(iter(kwargs))!r}"
 | 
						|
            )
 | 
						|
        if self.catch_varargs:
 | 
						|
            arguments.append(args[self._argument_count :])
 | 
						|
        elif len(args) > self._argument_count:
 | 
						|
            raise TypeError(
 | 
						|
                f"macro {self.name!r} takes not more than"
 | 
						|
                f" {len(self.arguments)} argument(s)"
 | 
						|
            )
 | 
						|
 | 
						|
        return self._invoke(arguments, autoescape)
 | 
						|
 | 
						|
    async def _async_invoke(self, arguments: t.List[t.Any], autoescape: bool) -> str:
 | 
						|
        rv = await self._func(*arguments)  # type: ignore
 | 
						|
 | 
						|
        if autoescape:
 | 
						|
            return Markup(rv)
 | 
						|
 | 
						|
        return rv  # type: ignore
 | 
						|
 | 
						|
    def _invoke(self, arguments: t.List[t.Any], autoescape: bool) -> str:
 | 
						|
        if self._environment.is_async:
 | 
						|
            return self._async_invoke(arguments, autoescape)  # type: ignore
 | 
						|
 | 
						|
        rv = self._func(*arguments)
 | 
						|
 | 
						|
        if autoescape:
 | 
						|
            rv = Markup(rv)
 | 
						|
 | 
						|
        return rv
 | 
						|
 | 
						|
    def __repr__(self) -> str:
 | 
						|
        name = "anonymous" if self.name is None else repr(self.name)
 | 
						|
        return f"<{type(self).__name__} {name}>"
 | 
						|
 | 
						|
 | 
						|
class Undefined:
 | 
						|
    """The default undefined type.  This undefined type can be printed and
 | 
						|
    iterated over, but every other access will raise an :exc:`UndefinedError`:
 | 
						|
 | 
						|
    >>> foo = Undefined(name='foo')
 | 
						|
    >>> str(foo)
 | 
						|
    ''
 | 
						|
    >>> not foo
 | 
						|
    True
 | 
						|
    >>> foo + 42
 | 
						|
    Traceback (most recent call last):
 | 
						|
      ...
 | 
						|
    jinja2.exceptions.UndefinedError: 'foo' is undefined
 | 
						|
    """
 | 
						|
 | 
						|
    __slots__ = (
 | 
						|
        "_undefined_hint",
 | 
						|
        "_undefined_obj",
 | 
						|
        "_undefined_name",
 | 
						|
        "_undefined_exception",
 | 
						|
    )
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        hint: t.Optional[str] = None,
 | 
						|
        obj: t.Any = missing,
 | 
						|
        name: t.Optional[str] = None,
 | 
						|
        exc: t.Type[TemplateRuntimeError] = UndefinedError,
 | 
						|
    ) -> None:
 | 
						|
        self._undefined_hint = hint
 | 
						|
        self._undefined_obj = obj
 | 
						|
        self._undefined_name = name
 | 
						|
        self._undefined_exception = exc
 | 
						|
 | 
						|
    @property
 | 
						|
    def _undefined_message(self) -> str:
 | 
						|
        """Build a message about the undefined value based on how it was
 | 
						|
        accessed.
 | 
						|
        """
 | 
						|
        if self._undefined_hint:
 | 
						|
            return self._undefined_hint
 | 
						|
 | 
						|
        if self._undefined_obj is missing:
 | 
						|
            return f"{self._undefined_name!r} is undefined"
 | 
						|
 | 
						|
        if not isinstance(self._undefined_name, str):
 | 
						|
            return (
 | 
						|
                f"{object_type_repr(self._undefined_obj)} has no"
 | 
						|
                f" element {self._undefined_name!r}"
 | 
						|
            )
 | 
						|
 | 
						|
        return (
 | 
						|
            f"{object_type_repr(self._undefined_obj)!r} has no"
 | 
						|
            f" attribute {self._undefined_name!r}"
 | 
						|
        )
 | 
						|
 | 
						|
    @internalcode
 | 
						|
    def _fail_with_undefined_error(
 | 
						|
        self, *args: t.Any, **kwargs: t.Any
 | 
						|
    ) -> "te.NoReturn":
 | 
						|
        """Raise an :exc:`UndefinedError` when operations are performed
 | 
						|
        on the undefined value.
 | 
						|
        """
 | 
						|
        raise self._undefined_exception(self._undefined_message)
 | 
						|
 | 
						|
    @internalcode
 | 
						|
    def __getattr__(self, name: str) -> t.Any:
 | 
						|
        if name[:2] == "__":
 | 
						|
            raise AttributeError(name)
 | 
						|
 | 
						|
        return self._fail_with_undefined_error()
 | 
						|
 | 
						|
    __add__ = __radd__ = __sub__ = __rsub__ = _fail_with_undefined_error
 | 
						|
    __mul__ = __rmul__ = __div__ = __rdiv__ = _fail_with_undefined_error
 | 
						|
    __truediv__ = __rtruediv__ = _fail_with_undefined_error
 | 
						|
    __floordiv__ = __rfloordiv__ = _fail_with_undefined_error
 | 
						|
    __mod__ = __rmod__ = _fail_with_undefined_error
 | 
						|
    __pos__ = __neg__ = _fail_with_undefined_error
 | 
						|
    __call__ = __getitem__ = _fail_with_undefined_error
 | 
						|
    __lt__ = __le__ = __gt__ = __ge__ = _fail_with_undefined_error
 | 
						|
    __int__ = __float__ = __complex__ = _fail_with_undefined_error
 | 
						|
    __pow__ = __rpow__ = _fail_with_undefined_error
 | 
						|
 | 
						|
    def __eq__(self, other: t.Any) -> bool:
 | 
						|
        return type(self) is type(other)
 | 
						|
 | 
						|
    def __ne__(self, other: t.Any) -> bool:
 | 
						|
        return not self.__eq__(other)
 | 
						|
 | 
						|
    def __hash__(self) -> int:
 | 
						|
        return id(type(self))
 | 
						|
 | 
						|
    def __str__(self) -> str:
 | 
						|
        return ""
 | 
						|
 | 
						|
    def __len__(self) -> int:
 | 
						|
        return 0
 | 
						|
 | 
						|
    def __iter__(self) -> t.Iterator[t.Any]:
 | 
						|
        yield from ()
 | 
						|
 | 
						|
    async def __aiter__(self) -> t.AsyncIterator[t.Any]:
 | 
						|
        for _ in ():
 | 
						|
            yield
 | 
						|
 | 
						|
    def __bool__(self) -> bool:
 | 
						|
        return False
 | 
						|
 | 
						|
    def __repr__(self) -> str:
 | 
						|
        return "Undefined"
 | 
						|
 | 
						|
 | 
						|
def make_logging_undefined(
 | 
						|
    logger: t.Optional["logging.Logger"] = None, base: t.Type[Undefined] = Undefined
 | 
						|
) -> t.Type[Undefined]:
 | 
						|
    """Given a logger object this returns a new undefined class that will
 | 
						|
    log certain failures.  It will log iterations and printing.  If no
 | 
						|
    logger is given a default logger is created.
 | 
						|
 | 
						|
    Example::
 | 
						|
 | 
						|
        logger = logging.getLogger(__name__)
 | 
						|
        LoggingUndefined = make_logging_undefined(
 | 
						|
            logger=logger,
 | 
						|
            base=Undefined
 | 
						|
        )
 | 
						|
 | 
						|
    .. versionadded:: 2.8
 | 
						|
 | 
						|
    :param logger: the logger to use.  If not provided, a default logger
 | 
						|
                   is created.
 | 
						|
    :param base: the base class to add logging functionality to.  This
 | 
						|
                 defaults to :class:`Undefined`.
 | 
						|
    """
 | 
						|
    if logger is None:
 | 
						|
        import logging
 | 
						|
 | 
						|
        logger = logging.getLogger(__name__)
 | 
						|
        logger.addHandler(logging.StreamHandler(sys.stderr))
 | 
						|
 | 
						|
    def _log_message(undef: Undefined) -> None:
 | 
						|
        logger.warning(  # type: ignore
 | 
						|
            "Template variable warning: %s", undef._undefined_message
 | 
						|
        )
 | 
						|
 | 
						|
    class LoggingUndefined(base):  # type: ignore
 | 
						|
        __slots__ = ()
 | 
						|
 | 
						|
        def _fail_with_undefined_error(  # type: ignore
 | 
						|
            self, *args: t.Any, **kwargs: t.Any
 | 
						|
        ) -> "te.NoReturn":
 | 
						|
            try:
 | 
						|
                super()._fail_with_undefined_error(*args, **kwargs)
 | 
						|
            except self._undefined_exception as e:
 | 
						|
                logger.error("Template variable error: %s", e)  # type: ignore
 | 
						|
                raise e
 | 
						|
 | 
						|
        def __str__(self) -> str:
 | 
						|
            _log_message(self)
 | 
						|
            return super().__str__()  # type: ignore
 | 
						|
 | 
						|
        def __iter__(self) -> t.Iterator[t.Any]:
 | 
						|
            _log_message(self)
 | 
						|
            return super().__iter__()  # type: ignore
 | 
						|
 | 
						|
        def __bool__(self) -> bool:
 | 
						|
            _log_message(self)
 | 
						|
            return super().__bool__()  # type: ignore
 | 
						|
 | 
						|
    return LoggingUndefined
 | 
						|
 | 
						|
 | 
						|
class ChainableUndefined(Undefined):
 | 
						|
    """An undefined that is chainable, where both ``__getattr__`` and
 | 
						|
    ``__getitem__`` return itself rather than raising an
 | 
						|
    :exc:`UndefinedError`.
 | 
						|
 | 
						|
    >>> foo = ChainableUndefined(name='foo')
 | 
						|
    >>> str(foo.bar['baz'])
 | 
						|
    ''
 | 
						|
    >>> foo.bar['baz'] + 42
 | 
						|
    Traceback (most recent call last):
 | 
						|
      ...
 | 
						|
    jinja2.exceptions.UndefinedError: 'foo' is undefined
 | 
						|
 | 
						|
    .. versionadded:: 2.11.0
 | 
						|
    """
 | 
						|
 | 
						|
    __slots__ = ()
 | 
						|
 | 
						|
    def __html__(self) -> str:
 | 
						|
        return str(self)
 | 
						|
 | 
						|
    def __getattr__(self, _: str) -> "ChainableUndefined":
 | 
						|
        return self
 | 
						|
 | 
						|
    __getitem__ = __getattr__  # type: ignore
 | 
						|
 | 
						|
 | 
						|
class DebugUndefined(Undefined):
 | 
						|
    """An undefined that returns the debug info when printed.
 | 
						|
 | 
						|
    >>> foo = DebugUndefined(name='foo')
 | 
						|
    >>> str(foo)
 | 
						|
    '{{ foo }}'
 | 
						|
    >>> not foo
 | 
						|
    True
 | 
						|
    >>> foo + 42
 | 
						|
    Traceback (most recent call last):
 | 
						|
      ...
 | 
						|
    jinja2.exceptions.UndefinedError: 'foo' is undefined
 | 
						|
    """
 | 
						|
 | 
						|
    __slots__ = ()
 | 
						|
 | 
						|
    def __str__(self) -> str:
 | 
						|
        if self._undefined_hint:
 | 
						|
            message = f"undefined value printed: {self._undefined_hint}"
 | 
						|
 | 
						|
        elif self._undefined_obj is missing:
 | 
						|
            message = self._undefined_name  # type: ignore
 | 
						|
 | 
						|
        else:
 | 
						|
            message = (
 | 
						|
                f"no such element: {object_type_repr(self._undefined_obj)}"
 | 
						|
                f"[{self._undefined_name!r}]"
 | 
						|
            )
 | 
						|
 | 
						|
        return f"{{{{ {message} }}}}"
 | 
						|
 | 
						|
 | 
						|
class StrictUndefined(Undefined):
 | 
						|
    """An undefined that barks on print and iteration as well as boolean
 | 
						|
    tests and all kinds of comparisons.  In other words: you can do nothing
 | 
						|
    with it except checking if it's defined using the `defined` test.
 | 
						|
 | 
						|
    >>> foo = StrictUndefined(name='foo')
 | 
						|
    >>> str(foo)
 | 
						|
    Traceback (most recent call last):
 | 
						|
      ...
 | 
						|
    jinja2.exceptions.UndefinedError: 'foo' is undefined
 | 
						|
    >>> not foo
 | 
						|
    Traceback (most recent call last):
 | 
						|
      ...
 | 
						|
    jinja2.exceptions.UndefinedError: 'foo' is undefined
 | 
						|
    >>> foo + 42
 | 
						|
    Traceback (most recent call last):
 | 
						|
      ...
 | 
						|
    jinja2.exceptions.UndefinedError: 'foo' is undefined
 | 
						|
    """
 | 
						|
 | 
						|
    __slots__ = ()
 | 
						|
    __iter__ = __str__ = __len__ = Undefined._fail_with_undefined_error
 | 
						|
    __eq__ = __ne__ = __bool__ = __hash__ = Undefined._fail_with_undefined_error
 | 
						|
    __contains__ = Undefined._fail_with_undefined_error
 | 
						|
 | 
						|
 | 
						|
# Remove slots attributes, after the metaclass is applied they are
 | 
						|
# unneeded and contain wrong data for subclasses.
 | 
						|
del (
 | 
						|
    Undefined.__slots__,
 | 
						|
    ChainableUndefined.__slots__,
 | 
						|
    DebugUndefined.__slots__,
 | 
						|
    StrictUndefined.__slots__,
 | 
						|
)
 |