opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
alembic
/
util
/
Go to Home Directory
+
Upload
Create File
root@0UT1S:~$
Execute
By Order of Mr.0UT1S
[DIR] ..
N/A
[DIR] __pycache__
N/A
__init__.py
1.43 KB
Rename
Delete
compat.py
2.57 KB
Rename
Delete
editor.py
2.49 KB
Rename
Delete
exc.py
564 bytes
Rename
Delete
langhelpers.py
9.79 KB
Rename
Delete
messaging.py
3.09 KB
Rename
Delete
pyfiles.py
3.41 KB
Rename
Delete
sqla_compat.py
14.44 KB
Rename
Delete
from __future__ import annotations from collections.abc import Iterable from contextlib import contextmanager import logging import sys import textwrap from typing import Iterator from typing import Optional from typing import TextIO from typing import Union import warnings from sqlalchemy.engine import url log = logging.getLogger(__name__) # disable "no handler found" errors logging.getLogger("alembic").addHandler(logging.NullHandler()) try: import fcntl import termios import struct ioctl = fcntl.ioctl(0, termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0)) _h, TERMWIDTH, _hp, _wp = struct.unpack("HHHH", ioctl) if TERMWIDTH <= 0: # can occur if running in emacs pseudo-tty TERMWIDTH = None except (ImportError, OSError): TERMWIDTH = None def write_outstream( stream: TextIO, *text: Union[str, bytes], quiet: bool = False ) -> None: if quiet: return encoding = getattr(stream, "encoding", "ascii") or "ascii" for t in text: if not isinstance(t, bytes): t = t.encode(encoding, "replace") t = t.decode(encoding) try: stream.write(t) except OSError: # suppress "broken pipe" errors. # no known way to handle this on Python 3 however # as the exception is "ignored" (noisily) in TextIOWrapper. break @contextmanager def status( status_msg: str, newline: bool = False, quiet: bool = False ) -> Iterator[None]: msg(status_msg + " ...", newline, flush=True, quiet=quiet) try: yield except: if not quiet: write_outstream(sys.stdout, " FAILED\n") raise else: if not quiet: write_outstream(sys.stdout, " done\n") def err(message: str, quiet: bool = False) -> None: log.error(message) msg(f"FAILED: {message}", quiet=quiet) sys.exit(-1) def obfuscate_url_pw(input_url: str) -> str: return url.make_url(input_url).render_as_string(hide_password=True) def warn(msg: str, stacklevel: int = 2) -> None: warnings.warn(msg, UserWarning, stacklevel=stacklevel) def msg( msg: str, newline: bool = True, flush: bool = False, quiet: bool = False ) -> None: if quiet: return if TERMWIDTH is None: write_outstream(sys.stdout, msg) if newline: write_outstream(sys.stdout, "\n") else: # left indent output lines indent = " " lines = textwrap.wrap( msg, TERMWIDTH, initial_indent=indent, subsequent_indent=indent, ) if len(lines) > 1: for line in lines[0:-1]: write_outstream(sys.stdout, line, "\n") write_outstream(sys.stdout, lines[-1], ("\n" if newline else "")) if flush: sys.stdout.flush() def format_as_comma(value: Optional[Union[str, Iterable[str]]]) -> str: if value is None: return "" elif isinstance(value, str): return value elif isinstance(value, Iterable): return ", ".join(value) else: raise ValueError("Don't know how to comma-format %r" % value)
Save