Source code for awsfindingsmanagerlib.backends

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: backends.py
#
# Copyright 2023 Marwin Baumann, Costas Tyfoxylos
#
# Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

"""
Main code for backends.

.. _Google Python Style Guide:
   https://google.github.io/styleguide/pyguide.html

"""

import logging
from abc import ABC, abstractmethod
from typing import List, Dict

import boto3
import requests
import yaml

from .validations import validate_rule_data

__author__ = '''Marwin Baumann <mbaumann@schubergphilis.com>, Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''21-11-2023'''
__copyright__ = '''Copyright 2023, Marwin Baumann, Costas Tyfoxylos'''
__credits__ = ["Ben van Breukelen", "Costas Tyfoxylos", "Marwin Baumann"]
__license__ = '''Apache Software License 2.0'''
__maintainer__ = '''Ben van Breukelen, Costas Tyfoxylos, Marwin Baumann'''
__email__ = '''<bvanbreukelen@schubergphilis.com>,<ctyfoxylos@schubergphilis.com>,<mbaumann@schubergphilis.com>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".

LOGGER_BASENAME = '''backends'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())


[docs] class Backend(ABC): @abstractmethod def _get_rules(self): """Retrieves the rules from the backend. Returns: A list of rule data from the backend. """
[docs] def get_rules(self): return [validate_rule_data(data) for data in self._get_rules()]
[docs] class Local(Backend): def __init__(self, path): self.path = path def _get_rules(self): with open(self.path, encoding='utf-8') as suppressions_file: data = yaml.safe_load(suppressions_file) return data.get('Rules')
[docs] class Http(Backend): def __init__(self, url): self.url = url def _get_rules(self): response = requests.get(self.url, timeout=2) response.raise_for_status() data = yaml.safe_load(response.text) return data.get('Rules')
[docs] class DynamoDB(Backend): def __init__(self, dynamodb_table_name) -> None: self._dynamodb_resource = self._get_dynamodb_client() self._table = self._get_dynamodb_table(dynamodb_table_name) @staticmethod def _get_dynamodb_client(): return boto3.client('dynamodb') @staticmethod def _get_dynamodb_table(dynamodb_table_name): dynamodb_resource = DynamoDB._get_dynamodb_client() return dynamodb_resource.Table(name=dynamodb_table_name) def _get_rules(self) -> List[Dict]: response = self._table.scan() data = response['Items'] while 'LastEvaluatedKey' in response: response = self._table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) data.extend(response['Items']) # Here iterate over the data and return the payloads. # Old code follows. # rules = self._suppression_dynamodb_table.get_item(Key={"controlId": self.hash_key}) # for rule in rules.get('Item', {}).get('data', {}): # self._entries.append( # Rule(action=rule.get('action'), # rules=rule.get('rules'), # notes=rule.get('notes'), # dry_run=rule.get('dry_run', False)) # ) # return self._entries return []
[docs] class S3(Backend): def __init__(self, bucket_name, file_name): self._file_contents = self._get_file_contents(bucket_name, file_name) @staticmethod def _get_file_contents(bucket_name, file_name): s3 = boto3.resource('s3') return s3.Object(bucket_name, file_name).get()['Body'].read() def _get_rules(self): data = yaml.safe_load(self._file_contents) return data.get('Rules')
# class GitHub(Backend) # # def _get_rules(self): # #