opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
prometheus_client
/
Go to Home Directory
+
Upload
Create File
root@0UT1S:~$
Execute
By Order of Mr.0UT1S
[DIR] ..
N/A
[DIR] __pycache__
N/A
[DIR] bridge
N/A
[DIR] openmetrics
N/A
[DIR] twisted
N/A
__init__.py
1.77 KB
Rename
Delete
asgi.py
1.57 KB
Rename
Delete
context_managers.py
2.29 KB
Rename
Delete
core.py
860 bytes
Rename
Delete
decorator.py
15.43 KB
Rename
Delete
exposition.py
25.56 KB
Rename
Delete
gc_collector.py
1.48 KB
Rename
Delete
metrics.py
27.45 KB
Rename
Delete
metrics_core.py
15.18 KB
Rename
Delete
mmap_dict.py
5.27 KB
Rename
Delete
multiprocess.py
7.36 KB
Rename
Delete
parser.py
7.26 KB
Rename
Delete
platform_collector.py
1.83 KB
Rename
Delete
process_collector.py
3.77 KB
Rename
Delete
py.typed
0 bytes
Rename
Delete
registry.py
6.05 KB
Rename
Delete
samples.py
1.65 KB
Rename
Delete
utils.py
594 bytes
Rename
Delete
values.py
4.88 KB
Rename
Delete
from collections import defaultdict import glob import json import os import warnings from .metrics import Gauge from .metrics_core import Metric from .mmap_dict import MmapedDict from .samples import Sample from .utils import floatToGoString try: # Python3 FileNotFoundError except NameError: # Python >= 2.5 FileNotFoundError = IOError class MultiProcessCollector: """Collector for files for multi-process mode.""" def __init__(self, registry, path=None): if path is None: # This deprecation warning can go away in a few releases when removing the compatibility if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ: os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir'] warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning) path = os.environ.get('PROMETHEUS_MULTIPROC_DIR') if not path or not os.path.isdir(path): raise ValueError('env PROMETHEUS_MULTIPROC_DIR is not set or not a directory') self._path = path if registry: registry.register(self) @staticmethod def merge(files, accumulate=True): """Merge metrics from given mmap files. By default, histograms are accumulated, as per prometheus wire format. But if writing the merged data back to mmap files, use accumulate=False to avoid compound accumulation. """ metrics = MultiProcessCollector._read_metrics(files) return MultiProcessCollector._accumulate_metrics(metrics, accumulate) @staticmethod def _read_metrics(files): metrics = {} key_cache = {} def _parse_key(key): val = key_cache.get(key) if not val: metric_name, name, labels, help_text = json.loads(key) labels_key = tuple(sorted(labels.items())) val = key_cache[key] = (metric_name, name, labels, labels_key, help_text) return val for f in files: parts = os.path.basename(f).split('_') typ = parts[0] try: file_values = MmapedDict.read_all_values_from_file(f) except FileNotFoundError: if typ == 'gauge' and parts[1].startswith('live'): # Files for 'live*' gauges can be deleted between the glob of collect # and now (via a mark_process_dead call) so don't fail if # the file is missing continue raise for key, value, timestamp, _ in file_values: metric_name, name, labels, labels_key, help_text = _parse_key(key) metric = metrics.get(metric_name) if metric is None: metric = Metric(metric_name, help_text, typ) metrics[metric_name] = metric if typ == 'gauge': pid = parts[2][:-3] metric._multiprocess_mode = parts[1] metric.add_sample(name, labels_key + (('pid', pid),), value, timestamp) else: # The duplicates and labels are fixed in the next for. metric.add_sample(name, labels_key, value) return metrics @staticmethod def _accumulate_metrics(metrics, accumulate): for metric in metrics.values(): samples = defaultdict(float) sample_timestamps = defaultdict(float) buckets = defaultdict(lambda: defaultdict(float)) samples_setdefault = samples.setdefault for s in metric.samples: name, labels, value, timestamp, exemplar = s if metric.type == 'gauge': without_pid_key = (name, tuple(l for l in labels if l[0] != 'pid')) if metric._multiprocess_mode in ('min', 'livemin'): current = samples_setdefault(without_pid_key, value) if value < current: samples[without_pid_key] = value elif metric._multiprocess_mode in ('max', 'livemax'): current = samples_setdefault(without_pid_key, value) if value > current: samples[without_pid_key] = value elif metric._multiprocess_mode in ('sum', 'livesum'): samples[without_pid_key] += value elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'): current_timestamp = sample_timestamps[without_pid_key] timestamp = float(timestamp or 0) if current_timestamp < timestamp: samples[without_pid_key] = value sample_timestamps[without_pid_key] = timestamp else: # all/liveall samples[(name, labels)] = value elif metric.type == 'histogram': # A for loop with early exit is faster than a genexpr # or a listcomp that ends up building unnecessary things for l in labels: if l[0] == 'le': bucket_value = float(l[1]) # _bucket without_le = tuple(l for l in labels if l[0] != 'le') buckets[without_le][bucket_value] += value break else: # did not find the `le` key # _sum/_count samples[(name, labels)] += value else: # Counter and Summary. samples[(name, labels)] += value # Accumulate bucket values. if metric.type == 'histogram': for labels, values in buckets.items(): acc = 0.0 for bucket, value in sorted(values.items()): sample_key = ( metric.name + '_bucket', labels + (('le', floatToGoString(bucket)),), ) if accumulate: acc += value samples[sample_key] = acc else: samples[sample_key] = value if accumulate: samples[(metric.name + '_count', labels)] = acc # Convert to correct sample format. metric.samples = [Sample(name_, dict(labels), value) for (name_, labels), value in samples.items()] return metrics.values() def collect(self): files = glob.glob(os.path.join(self._path, '*.db')) return self.merge(files, accumulate=True) _LIVE_GAUGE_MULTIPROCESS_MODES = {m for m in Gauge._MULTIPROC_MODES if m.startswith('live')} def mark_process_dead(pid, path=None): """Do bookkeeping for when one process dies in a multi-process setup.""" if path is None: path = os.environ.get('PROMETHEUS_MULTIPROC_DIR', os.environ.get('prometheus_multiproc_dir')) for mode in _LIVE_GAUGE_MULTIPROCESS_MODES: for f in glob.glob(os.path.join(path, f'gauge_{mode}_{pid}.db')): os.remove(f)
Save