mirror of
https://github.com/ansible/ansible.git
synced 2025-11-30 23:16:08 +07:00
ansible-test - Update Ansible Core CI auth (#85717)
Implement new authentication methods for accessing the Ansible Core CI service.
This commit is contained in:
2
changelogs/fragments/ansible-test-auth-update.yml
Normal file
2
changelogs/fragments/ansible-test-auth-update.yml
Normal file
@@ -0,0 +1,2 @@
|
||||
minor_changes:
|
||||
- ansible-test - Implement new authentication methods for accessing the Ansible Core CI service.
|
||||
@@ -3,22 +3,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import base64
|
||||
import dataclasses
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import tempfile
|
||||
import typing as t
|
||||
|
||||
from ..encoding import (
|
||||
to_bytes,
|
||||
to_text,
|
||||
)
|
||||
|
||||
from ..io import (
|
||||
read_text_file,
|
||||
write_text_file,
|
||||
)
|
||||
|
||||
from ..config import (
|
||||
CommonConfig,
|
||||
TestConfig,
|
||||
@@ -34,6 +25,65 @@ from ..util import (
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True, kw_only=True)
|
||||
class AuthContext:
|
||||
"""Information about the request to which authentication will be applied."""
|
||||
|
||||
stage: str
|
||||
provider: str
|
||||
request_id: str
|
||||
|
||||
|
||||
class AuthHelper:
|
||||
"""Authentication helper."""
|
||||
|
||||
NAMESPACE: t.ClassVar = 'ci@core.ansible.com'
|
||||
|
||||
def __init__(self, key_file: pathlib.Path) -> None:
|
||||
self.private_key_file = pathlib.Path(str(key_file).removesuffix('.pub'))
|
||||
self.public_key_file = pathlib.Path(f'{self.private_key_file}.pub')
|
||||
|
||||
def sign_request(self, request: dict[str, object], context: AuthContext) -> None:
|
||||
"""Sign the given auth request using the provided context."""
|
||||
request.update(
|
||||
stage=context.stage,
|
||||
provider=context.provider,
|
||||
request_id=context.request_id,
|
||||
timestamp=datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0).isoformat(),
|
||||
)
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
payload_path = pathlib.Path(temp_dir) / 'auth.json'
|
||||
payload_path.write_text(json.dumps(request, sort_keys=True))
|
||||
|
||||
cmd = ['ssh-keygen', '-q', '-Y', 'sign', '-f', str(self.private_key_file), '-n', self.NAMESPACE, str(payload_path)]
|
||||
raw_command(cmd, capture=False, interactive=True)
|
||||
|
||||
signature_path = pathlib.Path(f'{payload_path}.sig')
|
||||
signature = signature_path.read_text()
|
||||
|
||||
request.update(signature=signature)
|
||||
|
||||
|
||||
class GeneratingAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
|
||||
"""Authentication helper which generates a key pair on demand."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(pathlib.Path('~/.ansible/test/ansible-core-ci').expanduser())
|
||||
|
||||
def sign_request(self, request: dict[str, object], context: AuthContext) -> None:
|
||||
if not self.private_key_file.exists():
|
||||
self.generate_key_pair()
|
||||
|
||||
super().sign_request(request, context)
|
||||
|
||||
def generate_key_pair(self) -> None:
|
||||
"""Generate key pair."""
|
||||
self.private_key_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
raw_command(['ssh-keygen', '-q', '-f', str(self.private_key_file), '-N', ''], capture=True)
|
||||
|
||||
|
||||
class ChangeDetectionNotSupported(ApplicationError):
|
||||
"""Exception for cases where change detection is not supported."""
|
||||
|
||||
@@ -75,8 +125,8 @@ class CIProvider(metaclass=abc.ABCMeta):
|
||||
"""Return True if Ansible Core CI is supported."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def prepare_core_ci_auth(self) -> dict[str, t.Any]:
|
||||
"""Return authentication details for Ansible Core CI."""
|
||||
def prepare_core_ci_request(self, config: dict[str, object], context: AuthContext) -> dict[str, object]:
|
||||
"""Prepare an Ansible Core CI request using the given config and context."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_git_details(self, args: CommonConfig) -> t.Optional[dict[str, t.Any]]:
|
||||
@@ -101,119 +151,3 @@ def get_ci_provider() -> CIProvider:
|
||||
display.info('Detected CI provider: %s' % provider.name)
|
||||
|
||||
return provider
|
||||
|
||||
|
||||
class AuthHelper(metaclass=abc.ABCMeta):
|
||||
"""Public key based authentication helper for Ansible Core CI."""
|
||||
|
||||
def sign_request(self, request: dict[str, t.Any]) -> None:
|
||||
"""Sign the given auth request and make the public key available."""
|
||||
payload_bytes = to_bytes(json.dumps(request, sort_keys=True))
|
||||
signature_raw_bytes = self.sign_bytes(payload_bytes)
|
||||
signature = to_text(base64.b64encode(signature_raw_bytes))
|
||||
|
||||
request.update(signature=signature)
|
||||
|
||||
def initialize_private_key(self) -> str:
|
||||
"""
|
||||
Initialize and publish a new key pair (if needed) and return the private key.
|
||||
The private key is cached across ansible-test invocations, so it is only generated and published once per CI job.
|
||||
"""
|
||||
path = os.path.expanduser('~/.ansible-core-ci-private.key')
|
||||
|
||||
if os.path.exists(to_bytes(path)):
|
||||
private_key_pem = read_text_file(path)
|
||||
else:
|
||||
private_key_pem = self.generate_private_key()
|
||||
write_text_file(path, private_key_pem)
|
||||
|
||||
return private_key_pem
|
||||
|
||||
@abc.abstractmethod
|
||||
def sign_bytes(self, payload_bytes: bytes) -> bytes:
|
||||
"""Sign the given payload and return the signature, initializing a new key pair if required."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def publish_public_key(self, public_key_pem: str) -> None:
|
||||
"""Publish the given public key."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def generate_private_key(self) -> str:
|
||||
"""Generate a new key pair, publishing the public key and returning the private key."""
|
||||
|
||||
|
||||
class CryptographyAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
|
||||
"""Cryptography based public key based authentication helper for Ansible Core CI."""
|
||||
|
||||
def sign_bytes(self, payload_bytes: bytes) -> bytes:
|
||||
"""Sign the given payload and return the signature, initializing a new key pair if required."""
|
||||
# import cryptography here to avoid overhead and failures in environments which do not use/provide it
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
||||
|
||||
private_key_pem = self.initialize_private_key()
|
||||
private_key = load_pem_private_key(to_bytes(private_key_pem), None, default_backend())
|
||||
|
||||
assert isinstance(private_key, ec.EllipticCurvePrivateKey)
|
||||
|
||||
signature_raw_bytes = private_key.sign(payload_bytes, ec.ECDSA(hashes.SHA256()))
|
||||
|
||||
return signature_raw_bytes
|
||||
|
||||
def generate_private_key(self) -> str:
|
||||
"""Generate a new key pair, publishing the public key and returning the private key."""
|
||||
# import cryptography here to avoid overhead and failures in environments which do not use/provide it
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
|
||||
private_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
|
||||
public_key = private_key.public_key()
|
||||
|
||||
private_key_pem = to_text(private_key.private_bytes( # type: ignore[attr-defined] # documented method, but missing from type stubs
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
))
|
||||
|
||||
public_key_pem = to_text(public_key.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||
))
|
||||
|
||||
self.publish_public_key(public_key_pem)
|
||||
|
||||
return private_key_pem
|
||||
|
||||
|
||||
class OpenSSLAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
|
||||
"""OpenSSL based public key based authentication helper for Ansible Core CI."""
|
||||
|
||||
def sign_bytes(self, payload_bytes: bytes) -> bytes:
|
||||
"""Sign the given payload and return the signature, initializing a new key pair if required."""
|
||||
private_key_pem = self.initialize_private_key()
|
||||
|
||||
with tempfile.NamedTemporaryFile() as private_key_file:
|
||||
private_key_file.write(to_bytes(private_key_pem))
|
||||
private_key_file.flush()
|
||||
|
||||
with tempfile.NamedTemporaryFile() as payload_file:
|
||||
payload_file.write(payload_bytes)
|
||||
payload_file.flush()
|
||||
|
||||
with tempfile.NamedTemporaryFile() as signature_file:
|
||||
raw_command(['openssl', 'dgst', '-sha256', '-sign', private_key_file.name, '-out', signature_file.name, payload_file.name], capture=True)
|
||||
signature_raw_bytes = signature_file.read()
|
||||
|
||||
return signature_raw_bytes
|
||||
|
||||
def generate_private_key(self) -> str:
|
||||
"""Generate a new key pair, publishing the public key and returning the private key."""
|
||||
private_key_pem = raw_command(['openssl', 'ecparam', '-genkey', '-name', 'secp384r1', '-noout'], capture=True)[0]
|
||||
public_key_pem = raw_command(['openssl', 'ec', '-pubout'], data=private_key_pem, capture=True)[0]
|
||||
|
||||
self.publish_public_key(public_key_pem)
|
||||
|
||||
return private_key_pem
|
||||
|
||||
@@ -31,9 +31,10 @@ from ..util import (
|
||||
)
|
||||
|
||||
from . import (
|
||||
AuthContext,
|
||||
ChangeDetectionNotSupported,
|
||||
CIProvider,
|
||||
CryptographyAuthHelper,
|
||||
GeneratingAuthHelper,
|
||||
)
|
||||
|
||||
CODE = 'azp'
|
||||
@@ -112,10 +113,11 @@ class AzurePipelines(CIProvider):
|
||||
"""Return True if Ansible Core CI is supported."""
|
||||
return True
|
||||
|
||||
def prepare_core_ci_auth(self) -> dict[str, t.Any]:
|
||||
"""Return authentication details for Ansible Core CI."""
|
||||
def prepare_core_ci_request(self, config: dict[str, object], context: AuthContext) -> dict[str, object]:
|
||||
try:
|
||||
request = dict(
|
||||
request: dict[str, object] = dict(
|
||||
type="azp:ssh",
|
||||
config=config,
|
||||
org_name=os.environ['SYSTEM_COLLECTIONURI'].strip('/').split('/')[-1],
|
||||
project_name=os.environ['SYSTEM_TEAMPROJECT'],
|
||||
build_id=int(os.environ['BUILD_BUILDID']),
|
||||
@@ -124,13 +126,9 @@ class AzurePipelines(CIProvider):
|
||||
except KeyError as ex:
|
||||
raise MissingEnvironmentVariable(name=ex.args[0]) from None
|
||||
|
||||
self.auth.sign_request(request)
|
||||
self.auth.sign_request(request, context)
|
||||
|
||||
auth = dict(
|
||||
azp=request,
|
||||
)
|
||||
|
||||
return auth
|
||||
return request
|
||||
|
||||
def get_git_details(self, args: CommonConfig) -> t.Optional[dict[str, t.Any]]:
|
||||
"""Return details about git in the current environment."""
|
||||
@@ -144,14 +142,14 @@ class AzurePipelines(CIProvider):
|
||||
return details
|
||||
|
||||
|
||||
class AzurePipelinesAuthHelper(CryptographyAuthHelper):
|
||||
"""
|
||||
Authentication helper for Azure Pipelines.
|
||||
Based on cryptography since it is provided by the default Azure Pipelines environment.
|
||||
"""
|
||||
class AzurePipelinesAuthHelper(GeneratingAuthHelper):
|
||||
"""Authentication helper for Azure Pipelines."""
|
||||
|
||||
def generate_key_pair(self) -> None:
|
||||
super().generate_key_pair()
|
||||
|
||||
public_key_pem = self.public_key_file.read_text()
|
||||
|
||||
def publish_public_key(self, public_key_pem: str) -> None:
|
||||
"""Publish the given public key."""
|
||||
try:
|
||||
agent_temp_directory = os.environ['AGENT_TEMPDIRECTORY']
|
||||
except KeyError as ex:
|
||||
|
||||
@@ -2,10 +2,12 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import abc
|
||||
import inspect
|
||||
import platform
|
||||
import random
|
||||
import re
|
||||
import pathlib
|
||||
import typing as t
|
||||
|
||||
from ..config import (
|
||||
@@ -24,11 +26,14 @@ from ..git import (
|
||||
from ..util import (
|
||||
ApplicationError,
|
||||
display,
|
||||
get_subclasses,
|
||||
is_binary_file,
|
||||
SubprocessError,
|
||||
)
|
||||
|
||||
from . import (
|
||||
AuthContext,
|
||||
AuthHelper,
|
||||
CIProvider,
|
||||
)
|
||||
|
||||
@@ -120,34 +125,20 @@ class Local(CIProvider):
|
||||
|
||||
def supports_core_ci_auth(self) -> bool:
|
||||
"""Return True if Ansible Core CI is supported."""
|
||||
path = self._get_aci_key_path()
|
||||
return os.path.exists(path)
|
||||
return Authenticator.available()
|
||||
|
||||
def prepare_core_ci_auth(self) -> dict[str, t.Any]:
|
||||
"""Return authentication details for Ansible Core CI."""
|
||||
path = self._get_aci_key_path()
|
||||
auth_key = read_text_file(path).strip()
|
||||
def prepare_core_ci_request(self, config: dict[str, object], context: AuthContext) -> dict[str, object]:
|
||||
if not (authenticator := Authenticator.load()):
|
||||
raise ApplicationError('Ansible Core CI authentication has not been configured.')
|
||||
|
||||
request = dict(
|
||||
key=auth_key,
|
||||
nonce=None,
|
||||
)
|
||||
display.info(f'Using {authenticator} for Ansible Core CI.', verbosity=1)
|
||||
|
||||
auth = dict(
|
||||
remote=request,
|
||||
)
|
||||
|
||||
return auth
|
||||
return authenticator.prepare_auth_request(config, context)
|
||||
|
||||
def get_git_details(self, args: CommonConfig) -> t.Optional[dict[str, t.Any]]:
|
||||
"""Return details about git in the current environment."""
|
||||
return None # not yet implemented for local
|
||||
|
||||
@staticmethod
|
||||
def _get_aci_key_path() -> str:
|
||||
path = os.path.expanduser('~/.ansible-core-ci.key')
|
||||
return path
|
||||
|
||||
|
||||
class InvalidBranch(ApplicationError):
|
||||
"""Exception for invalid branch specification."""
|
||||
@@ -214,3 +205,108 @@ class LocalChanges:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class Authenticator(metaclass=abc.ABCMeta):
|
||||
"""Base class for authenticators."""
|
||||
|
||||
@staticmethod
|
||||
def list() -> list[type[Authenticator]]:
|
||||
"""List all authenticators in priority order."""
|
||||
return sorted((sc for sc in get_subclasses(Authenticator) if not inspect.isabstract(sc)), key=lambda obj: obj.priority())
|
||||
|
||||
@staticmethod
|
||||
def load() -> Authenticator | None:
|
||||
"""Load an authenticator instance, returning None if not configured."""
|
||||
for implementation in Authenticator.list():
|
||||
if implementation.config_file().exists():
|
||||
return implementation()
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def available() -> bool:
|
||||
"""Return True if an authenticator is available, otherwise False."""
|
||||
return bool(Authenticator.load())
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def priority(cls) -> int:
|
||||
"""Priority used to determine which authenticator is tried first, from lowest to highest."""
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def config_file(cls) -> pathlib.Path:
|
||||
"""Path to the config file for this authenticator."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def prepare_auth_request(self, config: dict[str, object], context: AuthContext) -> dict[str, object]:
|
||||
"""Prepare an authenticated Ansible Core CI request using the given config and context."""
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.__class__.__name__
|
||||
|
||||
|
||||
class PasswordAuthenticator(Authenticator):
|
||||
"""Authenticate using a password."""
|
||||
|
||||
@classmethod
|
||||
def priority(cls) -> int:
|
||||
return 200
|
||||
|
||||
@classmethod
|
||||
def config_file(cls) -> pathlib.Path:
|
||||
return pathlib.Path('~/.ansible-core-ci.key').expanduser()
|
||||
|
||||
def prepare_auth_request(self, config: dict[str, object], context: AuthContext) -> dict[str, object]:
|
||||
parts = self.config_file().read_text().strip().split(maxsplit=1)
|
||||
|
||||
if len(parts) == 1: # temporary backward compatibility for legacy API keys
|
||||
request = dict(
|
||||
config=config,
|
||||
auth=dict(
|
||||
remote=dict(
|
||||
key=parts[0],
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
return request
|
||||
|
||||
username, password = parts
|
||||
|
||||
request = dict(
|
||||
type="remote:password",
|
||||
config=config,
|
||||
username=username,
|
||||
password=password,
|
||||
)
|
||||
|
||||
return request
|
||||
|
||||
|
||||
class SshAuthenticator(Authenticator):
|
||||
"""Authenticate using an SSH key."""
|
||||
|
||||
@classmethod
|
||||
def priority(cls) -> int:
|
||||
return 100
|
||||
|
||||
@classmethod
|
||||
def config_file(cls) -> pathlib.Path:
|
||||
return pathlib.Path('~/.ansible-core-ci.auth').expanduser()
|
||||
|
||||
def prepare_auth_request(self, config: dict[str, object], context: AuthContext) -> dict[str, object]:
|
||||
parts = self.config_file().read_text().strip().split(maxsplit=1)
|
||||
username, key_file = parts
|
||||
|
||||
request: dict[str, object] = dict(
|
||||
type="remote:ssh",
|
||||
config=config,
|
||||
username=username,
|
||||
)
|
||||
|
||||
auth_helper = AuthHelper(pathlib.Path(key_file).expanduser())
|
||||
auth_helper.sign_request(request, context)
|
||||
|
||||
return request
|
||||
|
||||
@@ -42,6 +42,7 @@ from .config import (
|
||||
)
|
||||
|
||||
from .ci import (
|
||||
AuthContext,
|
||||
get_ci_provider,
|
||||
)
|
||||
|
||||
@@ -68,6 +69,10 @@ class Resource(metaclass=abc.ABCMeta):
|
||||
def persist(self) -> bool:
|
||||
"""True if the resource is persistent, otherwise false."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_config(self, core_ci: AnsibleCoreCI) -> dict[str, object]:
|
||||
"""Return the configuration for this resource."""
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class VmResource(Resource):
|
||||
@@ -92,6 +97,16 @@ class VmResource(Resource):
|
||||
"""True if the resource is persistent, otherwise false."""
|
||||
return True
|
||||
|
||||
def get_config(self, core_ci: AnsibleCoreCI) -> dict[str, object]:
|
||||
"""Return the configuration for this resource."""
|
||||
return dict(
|
||||
type="vm",
|
||||
platform=self.platform,
|
||||
version=self.version,
|
||||
architecture=self.architecture,
|
||||
public_key=core_ci.ssh_key.pub_contents,
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class CloudResource(Resource):
|
||||
@@ -112,6 +127,12 @@ class CloudResource(Resource):
|
||||
"""True if the resource is persistent, otherwise false."""
|
||||
return False
|
||||
|
||||
def get_config(self, core_ci: AnsibleCoreCI) -> dict[str, object]:
|
||||
"""Return the configuration for this resource."""
|
||||
return dict(
|
||||
type="cloud",
|
||||
)
|
||||
|
||||
|
||||
class AnsibleCoreCI:
|
||||
"""Client for Ansible Core CI services."""
|
||||
@@ -189,7 +210,7 @@ class AnsibleCoreCI:
|
||||
display.info(f'Skipping started {self.label} instance.', verbosity=1)
|
||||
return None
|
||||
|
||||
return self._start(self.ci_provider.prepare_core_ci_auth())
|
||||
return self._start()
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop instance."""
|
||||
@@ -288,26 +309,25 @@ class AnsibleCoreCI:
|
||||
def _uri(self) -> str:
|
||||
return f'{self.endpoint}/{self.stage}/{self.provider}/{self.instance_id}'
|
||||
|
||||
def _start(self, auth) -> dict[str, t.Any]:
|
||||
def _start(self) -> dict[str, t.Any]:
|
||||
"""Start instance."""
|
||||
display.info(f'Initializing new {self.label} instance using: {self._uri}', verbosity=1)
|
||||
|
||||
data = dict(
|
||||
config=dict(
|
||||
platform=self.platform,
|
||||
version=self.version,
|
||||
architecture=self.arch,
|
||||
public_key=self.ssh_key.pub_contents,
|
||||
)
|
||||
config = self.resource.get_config(self)
|
||||
|
||||
context = AuthContext(
|
||||
request_id=self.instance_id,
|
||||
stage=self.stage,
|
||||
provider=self.provider,
|
||||
)
|
||||
|
||||
data.update(auth=auth)
|
||||
request = self.ci_provider.prepare_core_ci_request(config, context)
|
||||
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
|
||||
response = self._start_endpoint(data, headers)
|
||||
response = self._start_endpoint(request, headers)
|
||||
|
||||
self.started = True
|
||||
self._save()
|
||||
|
||||
@@ -260,6 +260,9 @@ class HostProfile[THostConfig: HostConfig](metaclass=abc.ABCMeta):
|
||||
def name(self) -> str:
|
||||
"""The name of the host profile."""
|
||||
|
||||
def pre_provision(self) -> None:
|
||||
"""Pre-provision the host profile."""
|
||||
|
||||
def provision(self) -> None:
|
||||
"""Provision the host before delegation."""
|
||||
|
||||
@@ -517,8 +520,8 @@ class RemoteProfile[TRemoteConfig: RemoteConfig](SshTargetHostProfile[TRemoteCon
|
||||
"""The saved Ansible Core CI state."""
|
||||
self.state['core_ci'] = value
|
||||
|
||||
def provision(self) -> None:
|
||||
"""Provision the host before delegation."""
|
||||
def pre_provision(self) -> None:
|
||||
"""Pre-provision the host before delegation."""
|
||||
self.core_ci = self.create_core_ci(load=True)
|
||||
self.core_ci.start()
|
||||
|
||||
|
||||
@@ -129,6 +129,9 @@ def prepare_profiles[TEnvironmentConfig: EnvironmentConfig](
|
||||
|
||||
ExitHandler.register(functools.partial(cleanup_profiles, host_state))
|
||||
|
||||
for pre_profile in host_state.profiles:
|
||||
pre_profile.pre_provision()
|
||||
|
||||
def provision(profile: HostProfile) -> None:
|
||||
"""Provision the given profile."""
|
||||
profile.provision()
|
||||
|
||||
@@ -702,6 +702,7 @@ def common_environment() -> dict[str, str]:
|
||||
optional = (
|
||||
'LD_LIBRARY_PATH',
|
||||
'SSH_AUTH_SOCK',
|
||||
'SSH_SK_PROVIDER',
|
||||
# MacOS High Sierra Compatibility
|
||||
# http://sealiesoftware.com/blog/archive/2017/6/5/Objective-C_and_fork_in_macOS_1013.html
|
||||
# Example configuration for macOS:
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from .util import common_auth_test
|
||||
|
||||
|
||||
def test_auth():
|
||||
# noinspection PyProtectedMember
|
||||
from ansible_test._internal.ci.azp import (
|
||||
AzurePipelinesAuthHelper,
|
||||
)
|
||||
|
||||
class TestAzurePipelinesAuthHelper(AzurePipelinesAuthHelper):
|
||||
def __init__(self):
|
||||
self.public_key_pem = None
|
||||
self.private_key_pem = None
|
||||
|
||||
def publish_public_key(self, public_key_pem):
|
||||
# avoid publishing key
|
||||
self.public_key_pem = public_key_pem
|
||||
|
||||
def initialize_private_key(self):
|
||||
# cache in memory instead of on disk
|
||||
if not self.private_key_pem:
|
||||
self.private_key_pem = self.generate_private_key()
|
||||
|
||||
return self.private_key_pem
|
||||
|
||||
auth = TestAzurePipelinesAuthHelper()
|
||||
|
||||
common_auth_test(auth)
|
||||
@@ -1,50 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import re
|
||||
|
||||
|
||||
def common_auth_test(auth):
|
||||
private_key_pem = auth.initialize_private_key()
|
||||
public_key_pem = auth.public_key_pem
|
||||
|
||||
extract_pem_key(private_key_pem, private=True)
|
||||
extract_pem_key(public_key_pem, private=False)
|
||||
|
||||
request = dict(hello='World')
|
||||
auth.sign_request(request)
|
||||
|
||||
verify_signature(request, public_key_pem)
|
||||
|
||||
|
||||
def extract_pem_key(value, private):
|
||||
assert isinstance(value, type(u''))
|
||||
|
||||
key_type = '(EC )?PRIVATE' if private else 'PUBLIC'
|
||||
pattern = r'^-----BEGIN ' + key_type + r' KEY-----\n(?P<key>.*?)\n-----END ' + key_type + r' KEY-----\n$'
|
||||
match = re.search(pattern, value, flags=re.DOTALL)
|
||||
|
||||
assert match, 'key "%s" does not match pattern "%s"' % (value, pattern)
|
||||
|
||||
base64.b64decode(match.group('key')) # make sure the key can be decoded
|
||||
|
||||
|
||||
def verify_signature(request, public_key_pem):
|
||||
signature = request.pop('signature')
|
||||
payload_bytes = json.dumps(request, sort_keys=True).encode()
|
||||
|
||||
assert isinstance(signature, type(u''))
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_public_key
|
||||
|
||||
public_key = load_pem_public_key(public_key_pem.encode(), default_backend())
|
||||
|
||||
public_key.verify(
|
||||
base64.b64decode(signature.encode()),
|
||||
payload_bytes,
|
||||
ec.ECDSA(hashes.SHA256()),
|
||||
)
|
||||
Reference in New Issue
Block a user