Source code for cheflib.cheflib

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: cheflib.py
#
# Copyright 2024 Daan de Goede, Costas Tyfoxylos
#
# Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

"""
Main code for cheflib.

.. _Google Python Style Guide:
   https://google.github.io/styleguide/pyguide.html

"""

import logging
from collections.abc import Generator
import concurrent.futures
from typing import Optional

from chefsessionlib import ChefSession
from cheflib.entities import (EntityManager,
                              Client,
                              DataBag,
                              Environment,
                              Node,
                              Role)

from .cheflibexceptions import (CreateFailed)
from .configuration import (ENTITY_URL,
                            MAX_ROW_COUNT)

__author__ = 'Daan de Goede <ddegoede@schubergphilis.com>, Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'
__docformat__ = 'google'
__date__ = '18-01-2024'
__copyright__ = 'Copyright 2024, Daan de Goede, Costas Tyfoxylos'
__credits__ = ["Daan de Goede", "Costas Tyfoxylos"]
__license__ = 'Apache Software License 2.0'
__maintainer__ = 'Daan de Goede, Costas Tyfoxylos'
__email__ = '<ddegoede@schubergphilis.com>, <ctyfoxylos@schubergphilis.com>'
__status__ = 'Development'  # "Prototype", "Development", "Production".


# This is the main prefix used for logging
LOGGER_BASENAME = 'cheflib'
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())


[docs] class Chef: """Chef api library.""" def __init__(self, endpoint: str, organization: str, user_id: str, private_key_contents: str, client_version: str = '12.0.2', authentication_version: str = '1.3', api_version: int = 1): self._logger = logging.getLogger(f'{LOGGER_BASENAME}.{self.__class__.__name__}') self.base_url = endpoint self.organization = organization self._search_indexes = None self._max_http_workers = 4 self._logger.debug(f'Initializing cheflib with following settings:\n' f'organization: "{self.organization}"\n' f'base_url: "{self.base_url}"\n' f'max_http_workers: {self._max_http_workers}') self.session = self._get_authenticated_session(user_id, private_key_contents, client_version, authentication_version, api_version) def _get_authenticated_session(self, user_id, private_key_contents, client_version, authentication_version, api_version): """Get an authenticated session from ChefSessionLib.""" self._logger.debug('Instantiating ChefSession') session = ChefSession(user_id, private_key_contents, client_version, authentication_version, api_version) # The next call has dual functionality, we test authentication and retrieve the allowed search indexes self._logger.debug('Testing connectivity and gathering allowed search indexes') response = session.get(f'{self._organization_url}/search') self._search_indexes = response.json() return session def _get_paginated_response(self, entity_object, query=None, keys=None, parent_name: str = None, max_row_count: int = MAX_ROW_COUNT) -> Generator: """Get paginated reponse for large results.""" http_method = getattr(self.session, 'post' if keys else 'get') keys = keys or {} params = {'q': query, 'rows': max_row_count, 'start': 0} if query else {} url = self._request_url(entity_object.__name__, params, parent_name) response = http_method(url, params=params, json=keys) if not response.ok: self._logger.warning(f'Unable to retrieve paginated response for "{url}"') self._logger.debug(f'Parameters: {params}, keys: {keys}') yield None response_data = response.json() total = response_data.get('total', 0) row_count = params.get('rows', MAX_ROW_COUNT) self._logger.debug(f'Calculated that there are {row_count} pages to retrieve') yield from response_data.get('rows', response_data.items()) if total > row_count: with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_http_workers) as executor: futures = [] for start in range(row_count, total, row_count): params.update({'start': start}) futures.append(executor.submit(http_method, url, params=params.copy(), json=keys.copy())) for future in concurrent.futures.as_completed(futures): try: response = future.result() response_data = response.json() response.close() yield from response_data.get('rows') except Exception as e: # pylint: disable=broad-except self._logger.exception(f'Future failed...\nreason: {e}') def _create(self, url: str, data: dict) -> dict: """Entity creator function.""" response = self.session.post(url, json=data) if not response.ok: raise CreateFailed(f"Failed to create '{data['name']}, reason:\n{response.text}") return response.json() def _request_url(self, entity: str, params: dict = None, parent_name=None) -> str: """Format the request URL.""" if params: return f'{self._organization_url}/search/{entity.lower()}' return f'{ENTITY_URL[entity.lower()].format(organization_url=self._organization_url, parent_name=parent_name)}' @property def _organization_url(self) -> str: """Generate the organization URL.""" return f'{self.base_url}/organizations/{self.organization}' @property def clients(self) -> EntityManager: """List all clients. Returns: Generator for all clients """ return EntityManager(self, 'Client', self._organization_url)
[docs] def create_client(self, name: str, data: dict = None) -> Optional[Client]: """Create a client, with specified. Args: name: string, name of the client data: (optional) dictionary, containing additional attributes that will be added to the client Returns: New client """ if data is None: data = {} if not name: # log shizzle return None data = data or {'create_key': True} data.update(name=name) resp = self._create(f'{self._organization_url}/clients', data) return Client(self, name, resp['uri'], resp['chef_key'])
[docs] def delete_client_by_name(self, name: str) -> bool: """Deletes a client by name. Args: name: string, name of the client Returns: True if deletion succeeded or False if deletion failed """ client = self.get_client_by_name(name) return client.delete()
[docs] def get_client_by_name(self, name: str) -> Client: """Gets a client by name. Args: name: string, name of the client Returns: First client with specified name """ return next((client for client in self.clients if client.name.lower() == name.lower()), None)
@property def cookbooks(self) -> EntityManager: """List all cookbooks. Returns: Generator for all cookbooks """ return EntityManager(self, 'Cookbook', self._organization_url, 'name') @property def data_bags(self) -> EntityManager: """List all data bags. Returns: Generator for all data bags """ return EntityManager(self, 'DataBag', self._organization_url, 'name')
[docs] def create_data_bag(self, name: str) -> Optional[DataBag]: """Create a data bag. Args: name: string, name of the data bag Returns: New data bag """ if not name: # log shizzle return None data = {} data.update(name=name) resp = self._create(f'{self._organization_url}/data', data) return DataBag(self, name, resp['uri'])
[docs] def get_data_bag_by_name(self, name: str) -> DataBag: """Gets a data bag by name. Args: name: string, name of the data bag Returns: First data bag with specified name """ return next((data_bag for data_bag in self.data_bags if data_bag.name.lower() == name.lower()), None)
[docs] def get_data_bag_item_by_name(self, bag_name: str, name: str, secret: bytes = None) -> DataBag: """Gets a data bag item by name. Args: bag_name: string, name of the data bag name: string, name of the item secret: (optional) bytes, secret key to encrypt/decrypt the data bag Returns: Data bag item with specified name """ db = self.get_data_bag_by_name(bag_name) return db.get_item_by_name(name, secret)
@property def environments(self) -> EntityManager: """List all environments. Returns: Generator for all environments """ return EntityManager(self, 'Environment', self._organization_url, 'name')
[docs] def create_environment(self, name: str, data: dict = None) -> Environment: """Create a environment, with specified name. Args: name: string, name of the environment data: (optional) dictionary, containing additional attributes that will be added to the environment Returns: New environment """ data = data or {} data.update(name=name) resp = self._create(f'{self._organization_url}/environments', data) return Environment(self, name, resp['uri'])
[docs] def delete_environment_by_name(self, name: str) -> bool: """Delete environment with specifief name. Args: name: string, name of environment Returns: True if deletion succeeded or False if deletion failed """ environment = self.get_environment_by_name(name) return environment.delete()
[docs] def get_environment_by_name(self, name: str) -> Environment: """Get environment with specified name. Args: name: string, name of the environment Returns: First environment with specified name """ return next((environment for environment in self.environments if environment.name.lower() == name.lower()), None)
@property def nodes(self) -> EntityManager: """List all nodes. Returns: Generator for all nodes """ return EntityManager(self, 'Node', self._organization_url, 'name')
[docs] def create_node(self, name: str, data: dict = None) -> Node: """Create a node, with specified name. Args: name: string, Name of the node data: (optional) dictionary, containing additional attributes that will be added to the node Returns: New node """ data = data or {} data.update({'name': name}) resp = self._create(f'{self._organization_url}/nodes', data) return Node(self, name, resp['uri'])
[docs] def delete_node_by_name(self, name: str) -> bool: """Delete node with specified name. Args: name: string, name of the node Returns: True if deletion succeeded or False if deletion failed """ node = self.get_node_by_name(name) return node.delete()
[docs] def search_nodes(self, query: str = '*:*', keys: dict = None): """Search nodes, full and partial search supported. When a keys dictionary is provided, only those attributes will be returned. See https://docs.chef.io/chef_search/ for more details. Args: query: (optional) string containing the search query keys: (optional) dictionary, containing the returned attributes (partial search) Returns: Generator for all nodes matching the search """ return EntityManager(self, 'Node', self._organization_url, 'name').filter(query, keys)
[docs] def get_node_by_name(self, name: str) -> Node: """Get node with specified name. Args: name: string, name of the node Returns: First node with specified name """ return next((node for node in self.nodes if node.name.lower() == name.lower()), None)
[docs] def get_node_by_ip_address(self, ipaddress: str) -> Node: """Get node with specified IP address. Args: ipaddress: string, IP address of the node Returns: First node with specified IP address """ return next(EntityManager(self, 'Node', self._organization_url, 'name').filter(f'ipaddress:{ipaddress}'))
@property def roles(self) -> EntityManager: """List all roles. Returns: Generator for all roles """ return EntityManager(self, 'Role', self._organization_url, 'name')
[docs] def create_role(self, name: str, data: dict = None) -> Role: """Create a role, with specified name. Args: name: string, name of the role data: (optional) dictionary, containing additional attributes that will be added to the role. Returns: New role """ data = data or {} data.update({'name': name}) resp = self._create(f'{self._organization_url}/roles', data) return Role(self, name, resp['uri'])
[docs] def delete_role_by_name(self, name: str) -> bool: """Delete role with specified name. Args: name: string, name of the role Returns: True if deletion succeeded or False if deletion failed """ role = self.get_role_by_name(name) return role.delete()
[docs] def get_role_by_name(self, name: str) -> Role: """Get role with specified name. Args: name: string, name of the role Returns: First role with specified name """ return next((role for role in self.roles if role.name.lower() == name.lower()), None)
[docs] def raw(self, uri, method='get', **kwargs): """Raw API calls to chef API. See https://docs.chef.io/workstation/knife_raw/ for more details. Args: uri: string, uri part of the API call method: (optional) string, GET, PUT, DELETE or POST **kwargs: (optional) additional keyword arguments Returns: JSON response from chef API """ url = f'{self._organization_url}/{uri}' self._logger.debug(f'Performing RAW api call, using method "{method.lower()}" to URL "{url}"') response = getattr(self, method.lower())(url, **kwargs) return response.json()