Modify crypt library import pattern (#86120)

Hide the functionality of the _internal/_encryption/_crypt.py module
behind an object so that we don't have code executed at import time.
This commit is contained in:
David Shrewsbury
2025-11-11 17:33:45 -05:00
committed by GitHub
parent e0f61dfce4
commit 83d2ce771c
4 changed files with 272 additions and 111 deletions

View File

@@ -7,9 +7,10 @@ import ctypes
import ctypes.util
import os
import sys
import typing as t
from dataclasses import dataclass
__all__ = ['CRYPT_NAME', 'crypt', 'crypt_gensalt', 'HAS_CRYPT_GENSALT']
__all__ = ['CryptFacade']
_FAILURE_TOKENS = frozenset({b'*0', b'*1'})
@@ -37,123 +38,133 @@ _CRYPT_LIBS = (
),
)
for _lib_config in _CRYPT_LIBS:
if sys.platform in _lib_config.exclude_platforms:
continue
if _lib_config.include_platforms and sys.platform not in _lib_config.include_platforms:
continue
if _lib_config.name is None:
_lib_so = None
elif _lib_config.is_path:
if os.path.exists(_lib_config.name):
_lib_so = _lib_config.name
class CryptFacade:
"""
Provide an interface for various crypt libraries that might be available.
"""
def __init__(self) -> None:
self._crypt_impl: t.Callable | None = None
self._crypt_gensalt_impl: t.Callable | None = None
self._use_crypt_r = False
self._use_crypt_gensalt_rn = False
self._crypt_name = ""
self._setup()
class _CryptData(ctypes.Structure):
_fields_ = [('_opaque', ctypes.c_char * 131072)]
@property
def has_crypt_gensalt(self) -> bool:
return self._crypt_gensalt_impl is not None
def _setup(self) -> None:
"""Setup crypt implementation"""
for lib_config in _CRYPT_LIBS:
if sys.platform in lib_config.exclude_platforms:
continue
if lib_config.include_platforms and sys.platform not in lib_config.include_platforms:
continue
if lib_config.name is None:
lib_so = None
elif lib_config.is_path:
if os.path.exists(lib_config.name):
lib_so = lib_config.name
else:
continue
else:
lib_so = ctypes.util.find_library(lib_config.name)
if not lib_so:
continue
loaded_lib = ctypes.cdll.LoadLibrary(lib_so)
try:
self._crypt_impl = loaded_lib.crypt_r
self._use_crypt_r = True
except AttributeError:
try:
self._crypt_impl = loaded_lib.crypt
except AttributeError:
continue
if self._use_crypt_r:
self._crypt_impl.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(self._CryptData)]
self._crypt_impl.restype = ctypes.c_char_p
else:
self._crypt_impl.argtypes = [ctypes.c_char_p, ctypes.c_char_p]
self._crypt_impl.restype = ctypes.c_char_p
# Try to load crypt_gensalt (available in libxcrypt)
try:
self._crypt_gensalt_impl = loaded_lib.crypt_gensalt_rn
self._crypt_gensalt_impl.argtypes = [ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p, ctypes.c_int]
self._crypt_gensalt_impl.restype = ctypes.c_char_p
self._use_crypt_gensalt_rn = True
except AttributeError:
try:
self._crypt_gensalt_impl = loaded_lib.crypt_gensalt
self._crypt_gensalt_impl.argtypes = [ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p, ctypes.c_int]
self._crypt_gensalt_impl.restype = ctypes.c_char_p
except AttributeError:
self._crypt_gensalt_impl = None
self._crypt_name = lib_config.name
break
else:
continue
else:
_lib_so = ctypes.util.find_library(_lib_config.name)
if not _lib_so:
continue
raise ImportError('Cannot find crypt implementation')
_lib = ctypes.cdll.LoadLibrary(_lib_so)
def crypt(self, word: bytes, salt: bytes) -> bytes:
"""Hash a password using the system's crypt function."""
ctypes.set_errno(0)
_use_crypt_r = False
try:
_crypt_impl = _lib.crypt_r
_use_crypt_r = True
except AttributeError:
try:
_crypt_impl = _lib.crypt
except AttributeError:
continue
if self._use_crypt_r:
data = self._CryptData()
ctypes.memset(ctypes.byref(data), 0, ctypes.sizeof(data))
result = self._crypt_impl(word, salt, ctypes.byref(data))
else:
result = self._crypt_impl(word, salt)
if _use_crypt_r:
errno = ctypes.get_errno()
if errno:
error_msg = os.strerror(errno)
raise OSError(errno, f'crypt failed: {error_msg}')
class _crypt_data(ctypes.Structure):
_fields_ = [('_opaque', ctypes.c_char * 131072)]
if result is None:
raise ValueError('crypt failed: invalid salt or unsupported algorithm')
_crypt_impl.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(_crypt_data)]
_crypt_impl.restype = ctypes.c_char_p
else:
_crypt_impl.argtypes = [ctypes.c_char_p, ctypes.c_char_p]
_crypt_impl.restype = ctypes.c_char_p
if result in _FAILURE_TOKENS:
raise ValueError('crypt failed: invalid salt or unsupported algorithm')
# Try to load crypt_gensalt (available in libxcrypt)
_use_crypt_gensalt_rn = False
HAS_CRYPT_GENSALT = False
try:
_crypt_gensalt_impl = _lib.crypt_gensalt_rn
_crypt_gensalt_impl.argtypes = [ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p, ctypes.c_int]
_crypt_gensalt_impl.restype = ctypes.c_char_p
_use_crypt_gensalt_rn = True
HAS_CRYPT_GENSALT = True
except AttributeError:
try:
_crypt_gensalt_impl = _lib.crypt_gensalt
_crypt_gensalt_impl.argtypes = [ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p, ctypes.c_int]
_crypt_gensalt_impl.restype = ctypes.c_char_p
HAS_CRYPT_GENSALT = True
except AttributeError:
_crypt_gensalt_impl = None
return result
CRYPT_NAME = _lib_config.name
break
else:
raise ImportError('Cannot find crypt implementation')
def crypt_gensalt(self, prefix: bytes, count: int, rbytes: bytes) -> bytes:
"""Generate a salt string for use with crypt."""
if not self.has_crypt_gensalt:
raise NotImplementedError('crypt_gensalt not available (requires libxcrypt)')
ctypes.set_errno(0)
def crypt(word: bytes, salt: bytes) -> bytes:
"""Hash a password using the system's crypt function."""
ctypes.set_errno(0)
if self._use_crypt_gensalt_rn:
output = ctypes.create_string_buffer(256)
result = self._crypt_gensalt_impl(prefix, count, rbytes, len(rbytes), output, len(output))
if result is not None:
result = output.value
else:
result = self._crypt_gensalt_impl(prefix, count, rbytes, len(rbytes))
if _use_crypt_r:
data = _crypt_data()
ctypes.memset(ctypes.byref(data), 0, ctypes.sizeof(data))
result = _crypt_impl(word, salt, ctypes.byref(data))
else:
result = _crypt_impl(word, salt)
errno = ctypes.get_errno()
if errno:
error_msg = os.strerror(errno)
raise OSError(errno, f'crypt_gensalt failed: {error_msg}')
errno = ctypes.get_errno()
if errno:
error_msg = os.strerror(errno)
raise OSError(errno, f'crypt failed: {error_msg}')
if result is None:
raise ValueError('crypt_gensalt failed: unable to generate salt')
if result is None:
raise ValueError('crypt failed: invalid salt or unsupported algorithm')
if result in _FAILURE_TOKENS:
raise ValueError('crypt_gensalt failed: invalid prefix or unsupported algorithm')
if result in _FAILURE_TOKENS:
raise ValueError('crypt failed: invalid salt or unsupported algorithm')
return result
def crypt_gensalt(prefix: bytes, count: int, rbytes: bytes) -> bytes:
"""Generate a salt string for use with crypt."""
if not HAS_CRYPT_GENSALT:
raise NotImplementedError('crypt_gensalt not available (requires libxcrypt)')
ctypes.set_errno(0)
if _use_crypt_gensalt_rn:
output = ctypes.create_string_buffer(256)
result = _crypt_gensalt_impl(prefix, count, rbytes, len(rbytes), output, len(output))
if result is not None:
result = output.value
else:
result = _crypt_gensalt_impl(prefix, count, rbytes, len(rbytes))
errno = ctypes.get_errno()
if errno:
error_msg = os.strerror(errno)
raise OSError(errno, f'crypt_gensalt failed: {error_msg}')
if result is None:
raise ValueError('crypt_gensalt failed: unable to generate salt')
if result in _FAILURE_TOKENS:
raise ValueError('crypt_gensalt failed: invalid prefix or unsupported algorithm')
return result
del _lib_config
return result

View File

@@ -38,7 +38,8 @@ except Exception as e:
CRYPT_E = None
HAS_CRYPT = False
try:
from ansible._internal._encryption import _crypt
from ansible._internal._encryption._crypt import CryptFacade
_crypt_facade = CryptFacade()
HAS_CRYPT = True
except Exception as e:
CRYPT_E = e
@@ -121,14 +122,14 @@ class CryptHash(BaseHash):
self.algo_data = self.algorithms[algorithm]
if self.algo_data.requires_gensalt and not _crypt.HAS_CRYPT_GENSALT:
if self.algo_data.requires_gensalt and not _crypt_facade.has_crypt_gensalt:
raise AnsibleError(f"{self.algorithm!r} algorithm requires libxcrypt")
def hash(self, secret: str, salt: str | None = None, salt_size: int | None = None, rounds: int | None = None, ident: str | None = None) -> str:
rounds = self._rounds(rounds)
ident = self._ident(ident)
if _crypt.HAS_CRYPT_GENSALT:
if _crypt_facade.has_crypt_gensalt:
saltstring = self._gensalt(ident, rounds, salt, salt_size)
else:
saltstring = self._build_saltstring(ident, rounds, salt, salt_size)
@@ -174,7 +175,7 @@ class CryptHash(BaseHash):
count = rounds or 0
try:
salt_bytes = _crypt.crypt_gensalt(to_bytes(prefix), count, rbytes)
salt_bytes = _crypt_facade.crypt_gensalt(to_bytes(prefix), count, rbytes)
return to_text(salt_bytes, errors='strict')
except (NotImplementedError, ValueError) as e:
raise AnsibleError(f"Failed to generate salt for {self.algorithm!r} algorithm") from e
@@ -192,7 +193,7 @@ class CryptHash(BaseHash):
def _hash(self, secret: str, saltstring: str) -> str:
try:
result = _crypt.crypt(to_bytes(secret), to_bytes(saltstring))
result = _crypt_facade.crypt(to_bytes(secret), to_bytes(saltstring))
except (OSError, ValueError) as e:
raise AnsibleError(f"crypt does not support {self.algorithm!r} algorithm") from e

View File

@@ -0,0 +1,149 @@
from __future__ import annotations
import errno
import pytest
from pytest_mock import MockerFixture
from ansible._internal._encryption._crypt import _CryptLib, CryptFacade, _FAILURE_TOKENS
class TestCryptFacade:
def test_unsupported_platform(self, mocker: MockerFixture) -> None:
"""Test that unsupported platforms are skipped."""
mock_libs = (
_CryptLib('foo', include_platforms=frozenset({'fake_platform'})),
)
mocker.patch('ansible._internal._encryption._crypt._CRYPT_LIBS', mock_libs)
with pytest.raises(ImportError, match=r'Cannot find crypt implementation'):
CryptFacade()
def test_libc_fallback(self, mocker: MockerFixture) -> None:
"""Test that a library name of None will load the libc library."""
mock_libs = (
_CryptLib(None),
)
mocker.patch('ansible._internal._encryption._crypt._CRYPT_LIBS', mock_libs)
load_lib_mock = mocker.patch('ctypes.cdll.LoadLibrary')
crypt_facade = CryptFacade()
load_lib_mock.assert_called_once_with(None)
assert crypt_facade._crypt_name is None
def test_library_with_no_crypt_methods(self, mocker: MockerFixture) -> None:
"""Test that a library without crypt() and crypt_r() is skipped."""
mock_libs = (
_CryptLib(None),
)
class MockCDLL:
pass
mocker.patch('ansible._internal._encryption._crypt._CRYPT_LIBS', mock_libs)
mocker.patch('ctypes.cdll.LoadLibrary', return_value=MockCDLL())
with pytest.raises(ImportError, match=r'Cannot find crypt implementation'):
CryptFacade()
def test_library_with_no_crypt_r_or_crypt_gensalt_rn(self, mocker: MockerFixture) -> None:
"""Test that a library without crypt_r() or crypt_gensalt_rn() is prepped correctly."""
mock_libs = (
_CryptLib(None),
)
class MockCDLL:
class MockCrypt:
def __init__(self):
self.argtypes = None
self.restype = None
def __init__(self):
self.crypt = self.MockCrypt()
self.crypt_gensalt = self.MockCrypt()
mocker.patch('ansible._internal._encryption._crypt._CRYPT_LIBS', mock_libs)
mocker.patch('ctypes.cdll.LoadLibrary', return_value=MockCDLL())
crypt_facade = CryptFacade()
assert crypt_facade._crypt_impl is not None
assert crypt_facade._crypt_impl.argtypes is not None
assert crypt_facade._crypt_impl.restype is not None
assert crypt_facade._use_crypt_r is False
assert crypt_facade._crypt_gensalt_impl is not None
assert crypt_facade._crypt_gensalt_impl.argtypes is not None
assert crypt_facade._crypt_gensalt_impl.restype is not None
assert crypt_facade._use_crypt_gensalt_rn is False
assert crypt_facade.has_crypt_gensalt
def test_crypt_fail_errno(self, mocker: MockerFixture) -> None:
"""Test crypt() setting failure errno raises OSError."""
mocker.patch('ctypes.get_errno', return_value=errno.EBADFD)
crypt_facade = CryptFacade()
with pytest.raises(OSError, match=r'crypt failed:'):
crypt_facade.crypt(b"test", b"123")
def test_crypt_result_none(self, mocker: MockerFixture) -> None:
"""Test crypt() implementation returning None raises ValueError."""
crypt_facade = CryptFacade()
mocker.patch.object(crypt_facade, '_crypt_impl', return_value=None)
with pytest.raises(ValueError, match=r'crypt failed: invalid salt or unsupported algorithm'):
crypt_facade.crypt(b"test", b"123")
def test_crypt_result_failure(self, mocker: MockerFixture) -> None:
"""Test crypt() implementation returning failure token raises ValueError."""
crypt_facade = CryptFacade()
mocker.patch.object(crypt_facade, '_crypt_impl', return_value=list(_FAILURE_TOKENS)[0])
with pytest.raises(ValueError, match=r'crypt failed: invalid salt or unsupported algorithm'):
crypt_facade.crypt(b"test", b"123")
def test_crypt_gensalt_called_with_no_impl(self, mocker: MockerFixture) -> None:
"""Calling crypt_gensalt() without impl should raise NotImplementedError."""
crypt_facade = CryptFacade()
mock_prop = mocker.patch('ansible._internal._encryption._crypt.CryptFacade.has_crypt_gensalt', new_callable=mocker.PropertyMock)
mock_prop.return_value = False
with pytest.raises(NotImplementedError, match=r'crypt_gensalt not available \(requires libxcrypt\)'):
crypt_facade.crypt_gensalt(b"", 1, b"")
def test_crypt_gensalt(self, mocker: MockerFixture) -> None:
"""Test the NOT _use_crypt_gensalt_rn code path of crypt_gensalt()."""
crypt_facade = CryptFacade()
crypt_facade._use_crypt_gensalt_rn = False
mock_impl = mocker.patch.object(crypt_facade, '_crypt_gensalt_impl', return_value='')
crypt_facade.crypt_gensalt(b'', 1, b'')
mock_impl.assert_called_once_with(b'', 1, b'', 0)
def test_crypt_gensalt_fail_errno(self, mocker: MockerFixture) -> None:
"""Test crypt_gensalt() setting failure errno raises OSError."""
mocker.patch('ctypes.get_errno', return_value=errno.EBADFD)
crypt_facade = CryptFacade()
with pytest.raises(OSError, match=r'crypt_gensalt failed:'):
crypt_facade.crypt_gensalt(b'', 1, b'')
def test_crypt_gensalt_result_none(self, mocker: MockerFixture) -> None:
"""Test crypt_gensalt() implementation returning None raises ValueError."""
crypt_facade = CryptFacade()
mocker.patch.object(crypt_facade, '_crypt_gensalt_impl', return_value=None)
with pytest.raises(ValueError, match=r'crypt_gensalt failed: unable to generate salt'):
crypt_facade.crypt_gensalt(b'', 1, b'')
def test_crypt_gensalt_result_failure(self, mocker: MockerFixture) -> None:
"""Test crypt_gensalt() implementation returning failure token raises ValueError."""
crypt_facade = CryptFacade()
# Skip the _rn version as it modifies impl return value
crypt_facade._use_crypt_gensalt_rn = False
mocker.patch.object(crypt_facade, '_crypt_gensalt_impl', return_value=list(_FAILURE_TOKENS)[0])
with pytest.raises(ValueError, match=r'crypt_gensalt failed: invalid prefix or unsupported algorithm'):
crypt_facade.crypt_gensalt(b'', 1, b'')