opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
prometheus_client
/
Go to Home Directory
+
Upload
Create File
root@0UT1S:~$
Execute
By Order of Mr.0UT1S
[DIR] ..
N/A
[DIR] __pycache__
N/A
[DIR] bridge
N/A
[DIR] openmetrics
N/A
[DIR] twisted
N/A
__init__.py
1.77 KB
Rename
Delete
asgi.py
1.57 KB
Rename
Delete
context_managers.py
2.29 KB
Rename
Delete
core.py
860 bytes
Rename
Delete
decorator.py
15.43 KB
Rename
Delete
exposition.py
25.56 KB
Rename
Delete
gc_collector.py
1.48 KB
Rename
Delete
metrics.py
27.45 KB
Rename
Delete
metrics_core.py
15.18 KB
Rename
Delete
mmap_dict.py
5.27 KB
Rename
Delete
multiprocess.py
7.36 KB
Rename
Delete
parser.py
7.26 KB
Rename
Delete
platform_collector.py
1.83 KB
Rename
Delete
process_collector.py
3.77 KB
Rename
Delete
py.typed
0 bytes
Rename
Delete
registry.py
6.05 KB
Rename
Delete
samples.py
1.65 KB
Rename
Delete
utils.py
594 bytes
Rename
Delete
values.py
4.88 KB
Rename
Delete
import io as StringIO import re from typing import Dict, Iterable, List, Match, Optional, TextIO, Tuple from .metrics_core import Metric from .samples import Sample def text_string_to_metric_families(text: str) -> Iterable[Metric]: """Parse Prometheus text format from a unicode string. See text_fd_to_metric_families. """ yield from text_fd_to_metric_families(StringIO.StringIO(text)) ESCAPE_SEQUENCES = { '\\\\': '\\', '\\n': '\n', '\\"': '"', } def replace_escape_sequence(match: Match[str]) -> str: return ESCAPE_SEQUENCES[match.group(0)] HELP_ESCAPING_RE = re.compile(r'\\[\\n]') ESCAPING_RE = re.compile(r'\\[\\n"]') def _replace_help_escaping(s: str) -> str: return HELP_ESCAPING_RE.sub(replace_escape_sequence, s) def _replace_escaping(s: str) -> str: return ESCAPING_RE.sub(replace_escape_sequence, s) def _is_character_escaped(s: str, charpos: int) -> bool: num_bslashes = 0 while (charpos > num_bslashes and s[charpos - 1 - num_bslashes] == '\\'): num_bslashes += 1 return num_bslashes % 2 == 1 def _parse_labels(labels_string: str) -> Dict[str, str]: labels: Dict[str, str] = {} # Return if we don't have valid labels if "=" not in labels_string: return labels escaping = False if "\\" in labels_string: escaping = True # Copy original labels sub_labels = labels_string try: # Process one label at a time while sub_labels: # The label name is before the equal value_start = sub_labels.index("=") label_name = sub_labels[:value_start] sub_labels = sub_labels[value_start + 1:].lstrip() # Find the first quote after the equal quote_start = sub_labels.index('"') + 1 value_substr = sub_labels[quote_start:] # Find the last unescaped quote i = 0 while i < len(value_substr): i = value_substr.index('"', i) if not _is_character_escaped(value_substr, i): break i += 1 # The label value is between the first and last quote quote_end = i + 1 label_value = sub_labels[quote_start:quote_end] # Replace escaping if needed if escaping: label_value = _replace_escaping(label_value) labels[label_name.strip()] = label_value # Remove the processed label from the sub-slice for next iteration sub_labels = sub_labels[quote_end + 1:] next_comma = sub_labels.find(",") + 1 sub_labels = sub_labels[next_comma:].lstrip() return labels except ValueError: raise ValueError("Invalid labels: %s" % labels_string) # If we have multiple values only consider the first def _parse_value_and_timestamp(s: str) -> Tuple[float, Optional[float]]: s = s.lstrip() separator = " " if separator not in s: separator = "\t" values = [value.strip() for value in s.split(separator) if value.strip()] if not values: return float(s), None value = float(values[0]) timestamp = (float(values[-1]) / 1000) if len(values) > 1 else None return value, timestamp def _parse_sample(text: str) -> Sample: # Detect the labels in the text try: label_start, label_end = text.index("{"), text.rindex("}") # The name is before the labels name = text[:label_start].strip() # We ignore the starting curly brace label = text[label_start + 1:label_end] # The value is after the label end (ignoring curly brace) value, timestamp = _parse_value_and_timestamp(text[label_end + 1:]) return Sample(name, _parse_labels(label), value, timestamp) # We don't have labels except ValueError: # Detect what separator is used separator = " " if separator not in text: separator = "\t" name_end = text.index(separator) name = text[:name_end] # The value is after the name value, timestamp = _parse_value_and_timestamp(text[name_end:]) return Sample(name, {}, value, timestamp) def text_fd_to_metric_families(fd: TextIO) -> Iterable[Metric]: """Parse Prometheus text format from a file descriptor. This is a laxer parser than the main Go parser, so successful parsing does not imply that the parsed text meets the specification. Yields Metric's. """ name = '' documentation = '' typ = 'untyped' samples: List[Sample] = [] allowed_names = [] def build_metric(name: str, documentation: str, typ: str, samples: List[Sample]) -> Metric: # Munge counters into OpenMetrics representation # used internally. if typ == 'counter': if name.endswith('_total'): name = name[:-6] else: new_samples = [] for s in samples: new_samples.append(Sample(s[0] + '_total', *s[1:])) samples = new_samples metric = Metric(name, documentation, typ) metric.samples = samples return metric for line in fd: line = line.strip() if line.startswith('#'): parts = line.split(None, 3) if len(parts) < 2: continue if parts[1] == 'HELP': if parts[2] != name: if name != '': yield build_metric(name, documentation, typ, samples) # New metric name = parts[2] typ = 'untyped' samples = [] allowed_names = [parts[2]] if len(parts) == 4: documentation = _replace_help_escaping(parts[3]) else: documentation = '' elif parts[1] == 'TYPE': if parts[2] != name: if name != '': yield build_metric(name, documentation, typ, samples) # New metric name = parts[2] documentation = '' samples = [] typ = parts[3] allowed_names = { 'counter': [''], 'gauge': [''], 'summary': ['_count', '_sum', ''], 'histogram': ['_count', '_sum', '_bucket'], }.get(typ, ['']) allowed_names = [name + n for n in allowed_names] else: # Ignore other comment tokens pass elif line == '': # Ignore blank lines pass else: sample = _parse_sample(line) if sample.name not in allowed_names: if name != '': yield build_metric(name, documentation, typ, samples) # New metric, yield immediately as untyped singleton name = '' documentation = '' typ = 'untyped' samples = [] allowed_names = [] yield build_metric(sample[0], documentation, typ, [sample]) else: samples.append(sample) if name != '': yield build_metric(name, documentation, typ, samples)
Save