- GRAYBYTE UNDETECTABLE CODES -

403Webshell
Server IP : 184.154.167.98  /  Your IP : 18.220.222.188
Web Server : Apache
System : Linux pink.dnsnetservice.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64
User : puertode ( 1767)
PHP Version : 7.2.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/share/crypto-policies/python/cryptopolicies/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/share/crypto-policies/python/cryptopolicies/cryptopolicies.py
# SPDX-License-Identifier: LGPL-2.1-or-later

# Copyright (c) 2019 Red Hat, Inc.
# Copyright (c) 2019 Tomáš Mráz <tmraz@fedoraproject.org>

import collections
import enum
import fnmatch
import os
import re
import warnings

from . import alg_lists
from . import validation  # moved out of the way to not obscure the flow


# Defaults of integer property values (doubles as an allowlist)

INT_DEFAULTS = {k: 0 for k in (
    'arbitrary_dh_groups',
    'min_dh_size', 'min_dsa_size', 'min_rsa_size',
    'sha1_in_certs',
    'ssh_certs', 'ssh_etm',
)}


# Scopes (`@!ipsec`) and matching them

SCOPE_ANY = '*'

ALL_SCOPES = (  # defined explicitly to catch typos / globbing nothing
    'tls', 'ssl', 'openssl', 'nss', 'gnutls', 'java-tls',
    'ssh', 'openssh', 'openssh-server', 'openssh-client', 'libssh',
    'ipsec', 'ike', 'libreswan',
    'kerberos', 'krb5',
    'dnssec', 'bind',
)
DUMPABLE_SCOPES = {  # TODO: fix duplication, backends specify same things
    'bind': {'bind', 'dnssec'},
    'gnutls': {'gnutls', 'tls', 'ssl'},
    'java-tls': {'java-tls', 'tls', 'ssl'},
    'krb5': {'krb5', 'kerberos'},
    'libreswan': {'ipsec', 'ike', 'libreswan'},
    'libssh': {'libssh', 'openssh', 'ssh'},
    'nss': {'nss', 'tls', 'ssl'},
    'openssh-client': {'openssh-client', 'openssh', 'ssh'},
    'openssh-server': {'openssh-server', 'openssh', 'ssh'},
    'openssl': {'openssl', 'tls', 'ssl'},
}


class ScopeSelector:
    def __init__(self, pattern=SCOPE_ANY):
        """
        Initialize a scope selector.
        An example would be `ssh` in `ciphers@ssh = -NULL`.
        When openssh backend will request the configuration,
        it'll offer (`{'ssh', 'openssh'}`) as scopes
        and the rule above will be taken into account.
        Both patterns and scopes are cast to lowercase.
        For more examples, refer to tests/unit/parsing/test_scope_selector.py
        >>> ss = ScopeSelector('!{SSH,IPsec}')
        >>> ss.matches({'ipsec', 'libreswan'})
        False
        >>> ss.matches({'tls', 'openssl'})
        True
        """
        self.pattern = pattern = pattern.lower()
        self._positive = not pattern.startswith('!')
        p = pattern if self._positive else pattern[1:]

        validation.scope.illegal_characters(p, original_pattern=self.pattern)
        validation.scope.curly_brackets(p, original_pattern=self.pattern)

        self._globs = p[1:-1].split(',') if p.startswith('{') else [p]

        validation.scope.resulting_globs(self._globs, ALL_SCOPES,
                                         original_pattern=self.pattern)

    def __str__(self):
        return f'<ScopeSelector pattern={repr(self.pattern)}>'

    def matches(self, scopes):
        """
        Checks whether ScopeSelector matches one of the scopes.
        For more examples, refer to tests/unit/parsing/test_scope_selector.py
        >>> ScopeSelector('{SSH,IPsec}').matches({'ipsec', 'libreswan'})
        True
        >>> ScopeSelector('!{SSH,IPsec}').matches({'ipsec', 'libreswan'})
        False
        """
        if self.pattern == SCOPE_ANY:  # matches even an empty set
            return True
        scopes = [s.lower() for s in scopes]
        assert all(s in ALL_SCOPES for s in scopes)  # supplied by backends
        if self._positive:
            return any(fnmatch.filter(scopes, g) for g in self._globs)
        return all(not fnmatch.filter(scopes, g) for g in self._globs)


# Operations: interpreting right hand sides of (sub)policy files

class Operation(enum.Enum):
    """
    An operation that comes with the right-hand value of the directive.
    """
    RESET = 1     # cipher =
    PREPEND = 2   # cipher = +NULL
    APPEND = 3    # cipher = NULL+
    OMIT = 4      # cipher = -NULL
    SET_INT = 5   # sha1_in_certs = 0; setting to something that's all digits


def parse_rhs(rhs, prop_name):
    """
    Parses right-hand parts of the directives
    into lists of operation/value pairs.
    For more examples, refer to tests/unit/test_parsing.py
    >>> parse_rhs('', 'cipher')
    [(<Operation.RESET: 1>, None)]
    >>> parse_rhs('IDEA-CBC SEED-CBC', 'cipher')
    [(<Operation.RESET: 1>, None),
     (<Operation.APPEND: 3>, 'IDEA-CBC'),
     (<Operation.APPEND: 3>, 'SEED-CBC')]
    >>> # 3DES-CBC gets prepended last for higher prio
    >>> parse_rhs('+*DES-CBC', 'cipher')
    [(<Operation.PREPEND: 2>, 'DES-CBC'),
     (<Operation.PREPEND: 2>, '3DES-CBC')]
    """
    def differential(v):
        return v.startswith('+') or v.endswith('+') or v.startswith('-')

    if rhs.isdigit():
        if prop_name not in alg_lists.ALL and prop_name in INT_DEFAULTS:
            return [(Operation.SET_INT, int(rhs))]
        elif prop_name in alg_lists.ALL:
            raise validation.rules.NonIntPropertyIntValueError(prop_name)
        else:
            assert prop_name not in alg_lists.ALL
            assert prop_name not in INT_DEFAULTS
            # pass for now, it's gonna be caught as non-existing algclass
    else:
        if prop_name in INT_DEFAULTS:
            raise validation.rules.IntPropertyNonIntValueError(prop_name)

    values = rhs.split()

    if not any(differential(v) for v in values):  # Setting something anew
        values = sum([alg_lists.glob(v, prop_name) for v in values], [])
        return ([(Operation.RESET, None)]
                + [(Operation.APPEND, v) for v in values])
    elif all(differential(v) for v in values):  # Modifying an existing list
        operations = []
        for value in values:
            if value.startswith('+'):
                op = Operation.PREPEND
                unglob = alg_lists.glob(value[1:], prop_name)[::-1]
            elif value.endswith('+'):
                op = Operation.APPEND
                unglob = alg_lists.glob(value[:-1], prop_name)[::-1]
            else:
                assert value.startswith('-')
                op = Operation.OMIT
                unglob = alg_lists.glob(value[1:], prop_name)
            operations.extend([(op, v) for v in unglob])
        return operations
    else:  # Forbidden to mix them on one line
        raise validation.rules.MixedDifferentialNonDifferentialError(rhs)


# Directives: interpreting lines of (sub)policy files

Directive = collections.namedtuple('Directive', (
    'prop_name', 'scope', 'operation', 'value'
))


def parse_line(line):
    """
    Parses configuration lines into tuples of directives.
    For more examples, refer to tests/unit/test_parsing.py
    >>> parse_line('cipher@TLS = RC4* NULL')
    [Directive(prop_name='cipher', scope='tls',
               operation=<Operation.RESET: 1>, value=None),
     Directive(prop_name='cipher', scope='tls',
               operation=<Operation.APPEND: 3>, value='RC4-40'),
     Directive(prop_name='cipher', scope='tls',
               operation=<Operation.APPEND: 3>, value='RC4-128'),
     Directive(prop_name='cipher', scope='tls',
               operation=<Operation.APPEND: 3>, value='NULL')]
    """
    if not line.strip():
        return []
    validation.rules.count_equals_signs(line)

    lhs, rhs = line.split('=')
    lhs, rhs = lhs.strip(), rhs.strip()
    validation.rules.empty_lhs(lhs, line)

    prop_name, scope = lhs.split('@', 1) if '@' in lhs else (lhs, SCOPE_ANY)

    return [Directive(prop_name=prop_name, scope=scope.lower(),
                      operation=operation, value=value)
            for operation, value in parse_rhs(rhs, prop_name)]


def syntax_check_line(line, warn=False):
    try:
        l = parse_line(line)
        for d in l:
            ScopeSelector(d.scope)  # attempt parsing
    except validation.PolicySyntaxError as ex:
        if not warn:
            raise
        warnings.warn(ex)


class PolicySyntaxDeprecationWarning(FutureWarning):
    def __init__(self, deprecated, replacement):
        replacement = replacement.replace('\n', ' and ')
        msg = f'option {deprecated} is deprecated'
        msg += f', please rewrite your rules using {replacement}; '
        msg += 'be advised that it is not always a 1-1 replacement'
        super().__init__(msg)


def preprocess_text(text):
    r"""
    Preprocesses text before parsing.
    Fixes line breaks, handles backwards compatibility.
    >>> preprocess_text('cipher = c1 \\ \nc2#x')
    'cipher = c1 c2'
    >>> with warnings.catch_warnings():
    ...     warnings.simplefilter("ignore")
    ...     preprocess_text('ike_protocol = IKEv2')
    'protocol@IKE = IKEv2'
    >>> with warnings.catch_warnings():
    ...     warnings.simplefilter("ignore")
    ...     preprocess_text('min_tls_version=TLS1.3')
    'protocol@TLS = -SSL2.0 -SSL3.0 -TLS1.0 -TLS1.1 -TLS1.2'
    """
    text = re.sub(r'#.*', '', text)
    text = text.replace('=', ' = ')
    text = '\n'.join((l.strip() for l in text.split('\n')))
    text = text.replace('\\\n', '')
    text = '\n'.join((l.strip() for l in text.split('\n')))
    text = '\n'.join((re.sub(r'\s+', ' ', l) for l in text.split('\n')))
    text = re.sub('\n+', '\n', text).strip()

    if re.findall(r'\bprotocol\s*=', text):
        warnings.warn(PolicySyntaxDeprecationWarning('protocol',
                                                     'protocol@TLS'))

    POSTFIX_REPLACEMENTS = {
        'tls_cipher': 'cipher@TLS',
        'ssh_cipher': 'cipher@SSH',
        'ssh_group': 'group@SSH',
        'ike_protocol': 'protocol@IKE',
    }
    for fr, to in POSTFIX_REPLACEMENTS.items():
        regex = r'\b' + fr + r'\s*=(.*)'
        ms = re.findall(regex, text)
        if ms:
            warnings.warn(PolicySyntaxDeprecationWarning(fr, to))
        text = re.sub(regex, '', text)
        for m in ms:
            text += f'\n\n{to} ={m}'
    text = re.sub('\n+', '\n', text).strip()

    PLAIN_REPLACEMENTS = {
        'sha1_in_dnssec = 0':
            'hash@DNSSec = -SHA1\nsign@DNSSec = -RSA-SHA1 -ECDSA-SHA1',
        'sha1_in_dnssec = 1':
            'hash@DNSSec = SHA1+\nsign@DNSSec = RSA-SHA1+ ECDSA-SHA1+',
    }
    for fr, to in PLAIN_REPLACEMENTS.items():
        regex = r'\b' + fr + r'\b'
        if re.search(regex, text):
            warnings.warn(PolicySyntaxDeprecationWarning(fr, to))
        text = re.sub(regex, to, text)

    dtls_versions = list(alg_lists.DTLS_PROTOCOLS[::-1])
    while dtls_versions:
        neg = " ".join(("-" + v for v in dtls_versions[:-1]))
        text = re.sub(r'\bmin_dtls_version = ' + dtls_versions[-1] + r'\b',
                      f'protocol@TLS = {neg}' if neg else '', text)
        dtls_versions.pop()
    text = re.sub(r'\bmin_dtls_version = 0\b', '', text)

    tls_versions = list(alg_lists.TLS_PROTOCOLS[::-1])
    while tls_versions:
        neg = " ".join(("-" + v for v in tls_versions[:-1]))
        text = re.sub(r'\bmin_tls_version = ' + tls_versions[-1] + r'\b',
                      f'protocol@TLS = {neg}' if neg else '', text)
        tls_versions.pop()
    text = re.sub(r'\bmin_tls_version = 0\b', '', text)

    return text


# Finally, constructing a policy

class ScopedPolicy:
    """
    An entity constructing lists of what's `.enabled` and what's `.disabled`
    when the given scopes are active.
    >>> sp = ScopedPolicy(parse_line('cipher@TLS = RC4* NULL'), {'tls'})
    >>> 'AES-256-GCM' in sp.disabled['cipher']
    True
    >>> sp.enabled['cipher']
    ['RC4-40', 'RC4-128', 'NULL']
    >>> ScopedPolicy(parse_line('min_dh_size=2048')).integers['min_dh_size']
    2048
    """
    def __init__(self, directives, relevant_scopes=None):
        relevant_scopes = relevant_scopes or set()
        self.integers = INT_DEFAULTS.copy()
        self.enabled = {prop_name: [] for prop_name in alg_lists.ALL}

        for directive in directives:
            # TODO: validate that the target exists
            ss = ScopeSelector(directive.scope)
            if ss.matches(relevant_scopes):
                if directive.operation == Operation.RESET:
                    self.enabled[directive.prop_name] = []
                elif directive.operation == Operation.APPEND:
                    enabled = self.enabled[directive.prop_name]
                    if directive.value not in enabled:
                        enabled.append(directive.value)
                elif directive.operation == Operation.PREPEND:
                    enabled = self.enabled[directive.prop_name]
                    # in case of duplicates, remove the latter, lower-prio ones
                    if directive.value in enabled:
                        enabled.remove(directive.value)
                    enabled.insert(0, directive.value)
                elif directive.operation == Operation.OMIT:
                    self.enabled[directive.prop_name] = [
                        e for e in self.enabled[directive.prop_name]
                        if e != directive.value
                    ]
                else:
                    assert directive.operation == Operation.SET_INT
                    self.integers[directive.prop_name] = directive.value
        assert len(self.enabled) == len(set(self.enabled))

        self.disabled = {prop_name: [e for e in alg_lists.ALL[prop_name]
                                     if e not in self.enabled[prop_name]]
                         for prop_name in alg_lists.ALL}

    @property
    def min_tls_version(self):
        return alg_lists.min_tls_version(self.enabled['protocol'])

    @property
    def max_tls_version(self):
        return alg_lists.max_tls_version(self.enabled['protocol'])

    @property
    def min_dtls_version(self):
        return alg_lists.min_dtls_version(self.enabled['protocol'])

    @property
    def max_dtls_version(self):
        return alg_lists.max_dtls_version(self.enabled['protocol'])


# Locating policy files

def lookup_file(policyname, fname, paths):
    for d in paths:
        p = os.path.join(d, fname)
        if os.access(p, os.R_OK):
            return p
    raise validation.PolicyFileNotFoundError(policyname, fname, paths)


# main class

class UnscopedCryptoPolicy:
    CONFIG_DIR = '/etc/crypto-policies'

    SHARE_DIR = '/usr/share/crypto-policies'

    def __init__(self, policy_name, *subpolicy_names, policydir=None):
        self.policydir = policydir
        self.policyname = ':'.join((policy_name,) + subpolicy_names)

        self.lines = []

        directives = self.read_policy_file(policy_name)
        for subpolicy_name in subpolicy_names:
            directives += self.read_policy_file(subpolicy_name, subpolicy=True)
        self._directives = directives

    def is_empty(self):
        return not self._directives

    def scoped(self, scopes=None):
        return ScopedPolicy(self._directives, scopes or {})

    def read_policy_file(self, name, subpolicy=False):
        pdir = self.policydir or 'policies'
        if subpolicy:
            pdir = os.path.join(pdir, 'modules')
        p = lookup_file(name,
                        name + ('.pol' if not subpolicy else '.pmod'), (
                            os.path.curdir,
                            pdir,
                            os.path.join(self.CONFIG_DIR, pdir),
                            os.path.join(self.SHARE_DIR, pdir),
                        ))
        # TODO: error handling
        with open(p) as f:
            text = f.read()
        text = preprocess_text(text)
        lines = text.split('\n')
        for l in lines:  # display several warnings at once
            syntax_check_line(l, warn=True)
        for l in lines:  # crash
            syntax_check_line(l)
        return sum([parse_line(l) for l in lines], [])

    def __str__(self):
        def fmt(key, value):
            s = ' '.join(value) if isinstance(value, list) else str(value)
            return f'{key} = {s}'.rstrip() + '\n'

        generic_scoped = self.scoped()
        s = f'# Policy {self.policyname} dump\n'
        s += '#\n'
        s += '# Do not parse the contents of this file with automated tools,\n'
        s += '# it is provided for review convenience only.\n'
        s += '#\n'
        s += '# Baseline values for all scopes:\n'
        generic_all = {**generic_scoped.enabled, **generic_scoped.integers}
        for prop_name, value in generic_all.items():
            s += fmt(prop_name, value)
        anything_scope_specific = False
        for scope_name, scope_set in DUMPABLE_SCOPES.items():
            specific_scoped = self.scoped(scopes=scope_set)
            specific_all = {**specific_scoped.enabled,
                            **specific_scoped.integers}
            for prop_name, value in specific_all.items():
                if value != generic_all[prop_name]:
                    if not anything_scope_specific:
                        s += ('# Scope-specific properties '
                              'derived for select backends:\n')
                        anything_scope_specific = True
                    s += fmt(f'{prop_name}@{scope_name}', value)
        if not anything_scope_specific:
            s += '# No scope-specific properties found.\n'
        return s

Youez - 2016 - github.com/yon3zu
LinuXploit