Skip to content

Modules

ClusterResources is the object containing all the cluster resources:

  • buckets
  • bucket_policies
  • service_accounts
  • iam_policies
  • iam_policy_attachments
Source code in minio_manager/classes/resource_parser.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
class ClusterResources:
    """
    ClusterResources is the object containing all the cluster resources:

    - buckets
    - bucket_policies
    - service_accounts
    - iam_policies
    - iam_policy_attachments
    """

    buckets: list[Bucket]
    bucket_policies: list[BucketPolicy]
    service_accounts: list[ServiceAccount]
    iam_policies: list[IamPolicy]
    iam_policy_attachments: list[IamPolicyAttachment]

    def parse_buckets(self, buckets: list) -> list[Bucket]:
        """
        Parse the provided buckets with the following steps:

        For each provided bucket

            1. check the provided versioning. If versioning is not provided, set the default.
            2. check if an object lifecycle JSON file is provided, use the default_bucket_lifecycle_policy, or skip OLM
            3. parse the file and create a LifecycleConfig object for the bucket
            4. create a Bucket object

        Args:
            buckets: list of buckets to parse

        Returns: [Bucket]: list of Bucket objects
        """
        if not buckets:
            logger.debug("No buckets configured, skipping.")
            return []

        bucket_objects = []

        lifecycle_config = self.parse_bucket_lifecycle_file(settings.default_lifecycle_policy_file)
        bucket_names = []

        try:
            logger.debug(f"Parsing {len(buckets)} buckets...")
            if settings.allowed_bucket_prefixes:
                noun = "prefix" if len(settings.allowed_bucket_prefixes) == 1 else "prefixes"
                prefixes_str = ", ".join(settings.allowed_bucket_prefixes)
                logger.info(f"Only allowing buckets with the following {noun}: {prefixes_str}")
            for bucket in buckets:
                name = bucket["name"]
                if name in bucket_names:
                    logger.error(f"Bucket '{name}' defined multiple times.")
                    increment_error_count()
                logger.debug(f"Parsing bucket {name}")
                allowed_prefixes = settings.allowed_bucket_prefixes
                if allowed_prefixes and not name.startswith(allowed_prefixes):
                    logger.error(
                        f"Bucket '{name}' does not start with one of the required prefixes {allowed_prefixes}!"
                    )
                    increment_error_count()

                bucket_names.append(name)
                versioning = bucket.get("versioning")
                try:
                    versioning_config = VeCo(versioning) if versioning else VeCo(settings.default_bucket_versioning)
                except ValueError as ve:
                    logger.error(f"Error parsing versioning setting: {' '.join(ve.args)}")
                    versioning_config = VeCo(settings.default_bucket_versioning)  # workaround to use error count
                    increment_error_count()
                create_sa = bool(bucket.get("create_service_account", settings.default_bucket_versioning))
                lifecycle_file = bucket.get("object_lifecycle_file")
                if lifecycle_file:
                    bucket_lifecycle = self.parse_bucket_lifecycle_file(lifecycle_file)
                    if isinstance(bucket_lifecycle, LifecycleConfig):
                        lifecycle_config = bucket_lifecycle
                bucket_objects.append(Bucket(name, create_sa, versioning_config, lifecycle_config))
        except TypeError:
            logger.error("Buckets must be defined as a list of YAML dictionaries!")
            increment_error_count()

        return bucket_objects

    def parse_bucket_lifecycle_file(self, lifecycle_file: str) -> LifecycleConfig | None:
        """
        Parse a bucket lifecycle config file.

        The config files must be in JSON format and can be best obtained by running the following command:
            mc ilm rule export $cluster/$bucket > $policy_file.json

        Args:
            lifecycle_file: lifecycle config file

        Returns: LifecycleConfig object
        """
        if not lifecycle_file:
            return

        rules: list = []

        try:
            with Path(lifecycle_file).open() as f:
                config_data = json.load(f)
        except FileNotFoundError:
            logger.error(f"Lifecycle file {lifecycle_file} not found, skipping configuration.")
            increment_error_count()
            return
        except PermissionError:
            logger.error(f"Incorrect file permissions on {lifecycle_file}, skipping configuration.")
            increment_error_count()
            return

        try:
            rules_dict = config_data["Rules"]
        except KeyError:
            logger.error(f"Lifecycle file {lifecycle_file} is missing the required 'Rules' key.")
            increment_error_count()
            return

        try:
            for rule_data in rules_dict:
                parsed_rule = self.parse_bucket_lifecycle_rule(rule_data)
                rules.append(parsed_rule)
        except AttributeError:
            logger.error(f"Error parsing lifecycle file {lifecycle_file}. Is the format correct?")
            increment_error_count()

        if not rules:
            return

        return LifecycleConfig(rules)

    @staticmethod
    def parse_bucket_lifecycle_rule(rule_data: dict) -> Rule:
        """
        Parse a single bucket object lifecycle rule.

        TODO:
          Implement date and days in Expiration, implement Transition, NoncurrentVersionTransition, Filter, and
          AbortIncompleteMultipartUpload

        Args:
            rule_data: dict with rule data

        Returns: Rule object
        """
        rule_dict = {"status": rule_data.get("Status"), "rule_id": rule_data.get("ID")}

        expiration = rule_data.get("Expiration")
        if expiration:
            expire_delete_marker = expiration.get("ExpiredObjectDeleteMarker")
            rule_dict["expiration"] = Expiration(expired_object_delete_marker=expire_delete_marker)

        noncurrent_version_expiration = rule_data.get("NoncurrentVersionExpiration")
        if noncurrent_version_expiration:
            noncurrent_expire_days = noncurrent_version_expiration.get("NoncurrentDays")
            rule_dict["noncurrent_version_expiration"] = NoncurrentVersionExpiration(noncurrent_expire_days)

        # An empty filter is required for the rule to be valid
        rule_dict["rule_filter"] = Filter(prefix="")

        rule = Rule(**rule_dict)
        return rule

    @staticmethod
    def parse_bucket_policies(bucket_policies: list):
        """
        Parse a list of bucket policy definitions into BucketPolicy objects.

        Args:
            bucket_policies: list of bucket policies

        Returns: [BucketPolicy]
        """
        if not bucket_policies:
            logger.debug("No bucket policies configured, skipping.")
            return []

        bucket_policy_objects = []
        try:
            logger.debug(f"Parsing {len(bucket_policies)} bucket policies...")
            for bucket_policy in bucket_policies:
                bucket_policy_objects.append(BucketPolicy(bucket_policy["bucket"], bucket_policy["policy_file"]))
        except TypeError:
            logger.error("Bucket policies must be defined as a list of YAML dictionaries!")
            increment_error_count()

        return bucket_policy_objects

    @staticmethod
    def parse_service_accounts(service_accounts: list):
        """
        Parse a list of service account definitions into ServiceAccount objects.

        Args:
            service_accounts: dict of service accounts

        Returns: [ServiceAccount]
        """
        if not service_accounts:
            logger.debug("No service accounts configured, skipping.")
            return []

        service_account_objects, service_account_names = [], []

        try:
            logger.debug(f"Parsing {len(service_accounts)} service accounts...")
            for service_account in service_accounts:
                name = service_account["name"]
                if name in service_account_names:
                    logger.error(f"Service account '{name}' defined multiple times.")
                    increment_error_count()
                service_account_names.append(name)
                policy_file = service_account.get("policy_file")
                sa_obj = ServiceAccount(name=name, policy_file=policy_file)
                service_account_objects.append(sa_obj)
        except TypeError:
            logger.error("Service accounts must be defined as a list of YAML dictionaries!")
            sys.exit(141)

        return service_account_objects

    @staticmethod
    def parse_iam_attachments(iam_policy_attachments: list):
        """
        Parse a list of IAM policy attachment definitions into IamPolicyAttachment objects.

        Args:
            iam_policy_attachments: dict of IAM policy attachments

        Returns: [IamPolicyAttachment]
        """
        if not iam_policy_attachments:
            logger.debug("No IAM policy attachments configured, skipping.")
            return []

        iam_policy_attachment_objects = []
        try:
            logger.debug(f"Parsing {len(iam_policy_attachments)} IAM policy attachments...")
            for user in iam_policy_attachments:
                iam_policy_attachments.append(IamPolicyAttachment(user["username"], user["policies"]))
        except TypeError:
            logger.error("IAM policy attachments must be defined as a list of YAML dictionaries!")
            sys.exit(150)

        return iam_policy_attachment_objects

    @staticmethod
    def parse_iam_policies(iam_policies: dict):
        """
        Parse a list of IAM policy definitions into IamPolicy objects.

        Args:
            iam_policies: dict of IAM policies

        Returns: [IamPolicy]
        """
        if not iam_policies:
            logger.debug("No IAM policies configured, skipping.")
            return []

        iam_policy_objects, iam_policy_names = [], []
        try:
            logger.debug(f"Parsing {len(iam_policies)} IAM policies...")
            for iam_policy in iam_policies:
                name = iam_policy["name"]
                if name in iam_policy_names:
                    logger.error(f"IAM policy '{name}' defined multiple times.")
                    increment_error_count()
                iam_policy_names.append(name)
                iam_policy_objects.append(IamPolicy(name, iam_policy["policy_file"]))
        except TypeError:
            logger.error("IAM policies must be defined as a list of YAML dictionaries!")
            increment_error_count()

        return iam_policy_objects

    def parse_resources(self, resources_file: str):
        """
        Parse resources from a YAML file, ensuring they are valid before trying to use them.

        Args:
            resources_file: string path to the YAML file
        """
        logger.info("Loading and parsing resources...")

        try:
            resources = read_yaml(resources_file)
        except FileNotFoundError:
            logger.error(f"Resources file {resources_file} not found.")
            sys.exit(170)
        except PermissionError:
            logger.error(f"Incorrect file permissions on {resources_file}.")
            sys.exit(171)

        if not resources:
            logger.error("Is the resources file empty?")
            sys.exit(172)

        buckets = resources.get("buckets")
        self.buckets = self.parse_buckets(buckets)

        bucket_policies = resources.get("bucket_policies")
        self.bucket_policies = self.parse_bucket_policies(bucket_policies)

        service_accounts = resources.get("service_accounts")
        self.service_accounts = self.parse_service_accounts(service_accounts)

        iam_policies = resources.get("iam_policies")
        self.iam_policies = self.parse_iam_policies(iam_policies)

        iam_policy_attachments = resources.get("iam_policy_attachments")
        self.iam_policy_attachments = self.parse_iam_attachments(iam_policy_attachments)

        error_count = get_error_count()
        if error_count > 0:
            noun = "error" if error_count == 1 else "errors"
            logger.error(f"{error_count} {noun} found while parsing resources, you must resolve them first.")
            sys.exit(173)

        if not any([buckets, bucket_policies, service_accounts, iam_policies, iam_policy_attachments]):
            logger.warning("No resources configured.")
            sys.exit(0)

parse_bucket_lifecycle_file(lifecycle_file)

Parse a bucket lifecycle config file.

The config files must be in JSON format and can be best obtained by running the following command

mc ilm rule export $cluster/$bucket > $policy_file.json

Parameters:

Name Type Description Default
lifecycle_file str

lifecycle config file

required

Returns: LifecycleConfig object

Source code in minio_manager/classes/resource_parser.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def parse_bucket_lifecycle_file(self, lifecycle_file: str) -> LifecycleConfig | None:
    """
    Parse a bucket lifecycle config file.

    The config files must be in JSON format and can be best obtained by running the following command:
        mc ilm rule export $cluster/$bucket > $policy_file.json

    Args:
        lifecycle_file: lifecycle config file

    Returns: LifecycleConfig object
    """
    if not lifecycle_file:
        return

    rules: list = []

    try:
        with Path(lifecycle_file).open() as f:
            config_data = json.load(f)
    except FileNotFoundError:
        logger.error(f"Lifecycle file {lifecycle_file} not found, skipping configuration.")
        increment_error_count()
        return
    except PermissionError:
        logger.error(f"Incorrect file permissions on {lifecycle_file}, skipping configuration.")
        increment_error_count()
        return

    try:
        rules_dict = config_data["Rules"]
    except KeyError:
        logger.error(f"Lifecycle file {lifecycle_file} is missing the required 'Rules' key.")
        increment_error_count()
        return

    try:
        for rule_data in rules_dict:
            parsed_rule = self.parse_bucket_lifecycle_rule(rule_data)
            rules.append(parsed_rule)
    except AttributeError:
        logger.error(f"Error parsing lifecycle file {lifecycle_file}. Is the format correct?")
        increment_error_count()

    if not rules:
        return

    return LifecycleConfig(rules)

parse_bucket_lifecycle_rule(rule_data) staticmethod

Parse a single bucket object lifecycle rule.

TODO

Implement date and days in Expiration, implement Transition, NoncurrentVersionTransition, Filter, and AbortIncompleteMultipartUpload

Parameters:

Name Type Description Default
rule_data dict

dict with rule data

required

Returns: Rule object

Source code in minio_manager/classes/resource_parser.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@staticmethod
def parse_bucket_lifecycle_rule(rule_data: dict) -> Rule:
    """
    Parse a single bucket object lifecycle rule.

    TODO:
      Implement date and days in Expiration, implement Transition, NoncurrentVersionTransition, Filter, and
      AbortIncompleteMultipartUpload

    Args:
        rule_data: dict with rule data

    Returns: Rule object
    """
    rule_dict = {"status": rule_data.get("Status"), "rule_id": rule_data.get("ID")}

    expiration = rule_data.get("Expiration")
    if expiration:
        expire_delete_marker = expiration.get("ExpiredObjectDeleteMarker")
        rule_dict["expiration"] = Expiration(expired_object_delete_marker=expire_delete_marker)

    noncurrent_version_expiration = rule_data.get("NoncurrentVersionExpiration")
    if noncurrent_version_expiration:
        noncurrent_expire_days = noncurrent_version_expiration.get("NoncurrentDays")
        rule_dict["noncurrent_version_expiration"] = NoncurrentVersionExpiration(noncurrent_expire_days)

    # An empty filter is required for the rule to be valid
    rule_dict["rule_filter"] = Filter(prefix="")

    rule = Rule(**rule_dict)
    return rule

parse_bucket_policies(bucket_policies) staticmethod

Parse a list of bucket policy definitions into BucketPolicy objects.

Parameters:

Name Type Description Default
bucket_policies list

list of bucket policies

required

Returns: [BucketPolicy]

Source code in minio_manager/classes/resource_parser.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@staticmethod
def parse_bucket_policies(bucket_policies: list):
    """
    Parse a list of bucket policy definitions into BucketPolicy objects.

    Args:
        bucket_policies: list of bucket policies

    Returns: [BucketPolicy]
    """
    if not bucket_policies:
        logger.debug("No bucket policies configured, skipping.")
        return []

    bucket_policy_objects = []
    try:
        logger.debug(f"Parsing {len(bucket_policies)} bucket policies...")
        for bucket_policy in bucket_policies:
            bucket_policy_objects.append(BucketPolicy(bucket_policy["bucket"], bucket_policy["policy_file"]))
    except TypeError:
        logger.error("Bucket policies must be defined as a list of YAML dictionaries!")
        increment_error_count()

    return bucket_policy_objects

parse_buckets(buckets)

Parse the provided buckets with the following steps:

For each provided bucket

1. check the provided versioning. If versioning is not provided, set the default.
2. check if an object lifecycle JSON file is provided, use the default_bucket_lifecycle_policy, or skip OLM
3. parse the file and create a LifecycleConfig object for the bucket
4. create a Bucket object

Parameters:

Name Type Description Default
buckets list

list of buckets to parse

required

Returns: [Bucket]: list of Bucket objects

Source code in minio_manager/classes/resource_parser.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def parse_buckets(self, buckets: list) -> list[Bucket]:
    """
    Parse the provided buckets with the following steps:

    For each provided bucket

        1. check the provided versioning. If versioning is not provided, set the default.
        2. check if an object lifecycle JSON file is provided, use the default_bucket_lifecycle_policy, or skip OLM
        3. parse the file and create a LifecycleConfig object for the bucket
        4. create a Bucket object

    Args:
        buckets: list of buckets to parse

    Returns: [Bucket]: list of Bucket objects
    """
    if not buckets:
        logger.debug("No buckets configured, skipping.")
        return []

    bucket_objects = []

    lifecycle_config = self.parse_bucket_lifecycle_file(settings.default_lifecycle_policy_file)
    bucket_names = []

    try:
        logger.debug(f"Parsing {len(buckets)} buckets...")
        if settings.allowed_bucket_prefixes:
            noun = "prefix" if len(settings.allowed_bucket_prefixes) == 1 else "prefixes"
            prefixes_str = ", ".join(settings.allowed_bucket_prefixes)
            logger.info(f"Only allowing buckets with the following {noun}: {prefixes_str}")
        for bucket in buckets:
            name = bucket["name"]
            if name in bucket_names:
                logger.error(f"Bucket '{name}' defined multiple times.")
                increment_error_count()
            logger.debug(f"Parsing bucket {name}")
            allowed_prefixes = settings.allowed_bucket_prefixes
            if allowed_prefixes and not name.startswith(allowed_prefixes):
                logger.error(
                    f"Bucket '{name}' does not start with one of the required prefixes {allowed_prefixes}!"
                )
                increment_error_count()

            bucket_names.append(name)
            versioning = bucket.get("versioning")
            try:
                versioning_config = VeCo(versioning) if versioning else VeCo(settings.default_bucket_versioning)
            except ValueError as ve:
                logger.error(f"Error parsing versioning setting: {' '.join(ve.args)}")
                versioning_config = VeCo(settings.default_bucket_versioning)  # workaround to use error count
                increment_error_count()
            create_sa = bool(bucket.get("create_service_account", settings.default_bucket_versioning))
            lifecycle_file = bucket.get("object_lifecycle_file")
            if lifecycle_file:
                bucket_lifecycle = self.parse_bucket_lifecycle_file(lifecycle_file)
                if isinstance(bucket_lifecycle, LifecycleConfig):
                    lifecycle_config = bucket_lifecycle
            bucket_objects.append(Bucket(name, create_sa, versioning_config, lifecycle_config))
    except TypeError:
        logger.error("Buckets must be defined as a list of YAML dictionaries!")
        increment_error_count()

    return bucket_objects

parse_iam_attachments(iam_policy_attachments) staticmethod

Parse a list of IAM policy attachment definitions into IamPolicyAttachment objects.

Parameters:

Name Type Description Default
iam_policy_attachments list

dict of IAM policy attachments

required

Returns: [IamPolicyAttachment]

Source code in minio_manager/classes/resource_parser.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
@staticmethod
def parse_iam_attachments(iam_policy_attachments: list):
    """
    Parse a list of IAM policy attachment definitions into IamPolicyAttachment objects.

    Args:
        iam_policy_attachments: dict of IAM policy attachments

    Returns: [IamPolicyAttachment]
    """
    if not iam_policy_attachments:
        logger.debug("No IAM policy attachments configured, skipping.")
        return []

    iam_policy_attachment_objects = []
    try:
        logger.debug(f"Parsing {len(iam_policy_attachments)} IAM policy attachments...")
        for user in iam_policy_attachments:
            iam_policy_attachments.append(IamPolicyAttachment(user["username"], user["policies"]))
    except TypeError:
        logger.error("IAM policy attachments must be defined as a list of YAML dictionaries!")
        sys.exit(150)

    return iam_policy_attachment_objects

parse_iam_policies(iam_policies) staticmethod

Parse a list of IAM policy definitions into IamPolicy objects.

Parameters:

Name Type Description Default
iam_policies dict

dict of IAM policies

required

Returns: [IamPolicy]

Source code in minio_manager/classes/resource_parser.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
@staticmethod
def parse_iam_policies(iam_policies: dict):
    """
    Parse a list of IAM policy definitions into IamPolicy objects.

    Args:
        iam_policies: dict of IAM policies

    Returns: [IamPolicy]
    """
    if not iam_policies:
        logger.debug("No IAM policies configured, skipping.")
        return []

    iam_policy_objects, iam_policy_names = [], []
    try:
        logger.debug(f"Parsing {len(iam_policies)} IAM policies...")
        for iam_policy in iam_policies:
            name = iam_policy["name"]
            if name in iam_policy_names:
                logger.error(f"IAM policy '{name}' defined multiple times.")
                increment_error_count()
            iam_policy_names.append(name)
            iam_policy_objects.append(IamPolicy(name, iam_policy["policy_file"]))
    except TypeError:
        logger.error("IAM policies must be defined as a list of YAML dictionaries!")
        increment_error_count()

    return iam_policy_objects

parse_resources(resources_file)

Parse resources from a YAML file, ensuring they are valid before trying to use them.

Parameters:

Name Type Description Default
resources_file str

string path to the YAML file

required
Source code in minio_manager/classes/resource_parser.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def parse_resources(self, resources_file: str):
    """
    Parse resources from a YAML file, ensuring they are valid before trying to use them.

    Args:
        resources_file: string path to the YAML file
    """
    logger.info("Loading and parsing resources...")

    try:
        resources = read_yaml(resources_file)
    except FileNotFoundError:
        logger.error(f"Resources file {resources_file} not found.")
        sys.exit(170)
    except PermissionError:
        logger.error(f"Incorrect file permissions on {resources_file}.")
        sys.exit(171)

    if not resources:
        logger.error("Is the resources file empty?")
        sys.exit(172)

    buckets = resources.get("buckets")
    self.buckets = self.parse_buckets(buckets)

    bucket_policies = resources.get("bucket_policies")
    self.bucket_policies = self.parse_bucket_policies(bucket_policies)

    service_accounts = resources.get("service_accounts")
    self.service_accounts = self.parse_service_accounts(service_accounts)

    iam_policies = resources.get("iam_policies")
    self.iam_policies = self.parse_iam_policies(iam_policies)

    iam_policy_attachments = resources.get("iam_policy_attachments")
    self.iam_policy_attachments = self.parse_iam_attachments(iam_policy_attachments)

    error_count = get_error_count()
    if error_count > 0:
        noun = "error" if error_count == 1 else "errors"
        logger.error(f"{error_count} {noun} found while parsing resources, you must resolve them first.")
        sys.exit(173)

    if not any([buckets, bucket_policies, service_accounts, iam_policies, iam_policy_attachments]):
        logger.warning("No resources configured.")
        sys.exit(0)

parse_service_accounts(service_accounts) staticmethod

Parse a list of service account definitions into ServiceAccount objects.

Parameters:

Name Type Description Default
service_accounts list

dict of service accounts

required

Returns: [ServiceAccount]

Source code in minio_manager/classes/resource_parser.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@staticmethod
def parse_service_accounts(service_accounts: list):
    """
    Parse a list of service account definitions into ServiceAccount objects.

    Args:
        service_accounts: dict of service accounts

    Returns: [ServiceAccount]
    """
    if not service_accounts:
        logger.debug("No service accounts configured, skipping.")
        return []

    service_account_objects, service_account_names = [], []

    try:
        logger.debug(f"Parsing {len(service_accounts)} service accounts...")
        for service_account in service_accounts:
            name = service_account["name"]
            if name in service_account_names:
                logger.error(f"Service account '{name}' defined multiple times.")
                increment_error_count()
            service_account_names.append(name)
            policy_file = service_account.get("policy_file")
            sa_obj = ServiceAccount(name=name, policy_file=policy_file)
            service_account_objects.append(sa_obj)
    except TypeError:
        logger.error("Service accounts must be defined as a list of YAML dictionaries!")
        sys.exit(141)

    return service_account_objects

Bases: Exception

Base class for Minio Manager errors.

Source code in minio_manager/classes/errors.py
1
2
3
4
5
class MinioManagerBaseError(Exception):
    """Base class for Minio Manager errors."""

    def __init__(self, message: str, cause=None):
        super().__init__(f"{message}: {cause}" if cause else message)

Bases: Filter

The MinioManagerFilter is a custom logging Filter that masks secret values.

Source code in minio_manager/classes/logging_config.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class MinioManagerFilter(Filter):
    """
    The MinioManagerFilter is a custom logging Filter that masks secret values.
    """

    wrapper_secret_re = re.compile(r"--secret-key (?P<secret>[\w+/]*)")
    alias_set_secret_re = re.compile(r"alias set .+ (?P<secret>[\w+/]*)$")
    env_keepass_password_re = re.compile(r"MINIO_MANAGER_KEEPASS_PASSWORD: (?P<secret>[\w+/]*)$")
    env_secret_key_re = re.compile(r"MINIO_MANAGER_SECRET_BACKEND_S3_SECRET_KEY: (?P<secret>[\w+/]*)$")

    def filter(self, record: LogRecord) -> bool:
        if not isinstance(record.msg, str):
            return True

        if "--secret" in record.msg:
            record.msg = self.mask_secret(record.msg, self.wrapper_secret_re)
        if "alias set" in record.msg:
            record.msg = self.mask_secret(record.msg, self.alias_set_secret_re)
        if "MINIO_MANAGER_KEEPASS_PASSWORD" in record.msg:
            record.msg = self.mask_secret(record.msg, self.env_keepass_password_re)
        if "MINIO_MANAGER_SECRET_BACKEND_S3_SECRET_KEY" in record.msg:
            record.msg = self.mask_secret(record.msg, self.env_secret_key_re)

        return True

    @staticmethod
    def mask_secret(message: str, regex: re.Pattern) -> str:
        result = regex.search(message)
        if result:
            message = message.replace(result.group("secret"), "************")
        return message

Bases: Formatter

The MinioManagerFormatter is a custom logging Formatter that provides formatting and colourises log messages.

Source code in minio_manager/classes/logging_config.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class MinioManagerFormatter(Formatter):
    """
    The MinioManagerFormatter is a custom logging Formatter that provides formatting and colourises log messages.
    """

    def __init__(self, level: int):
        self.log_level = level
        if level is INFO:
            log_format = "[{asctime}] [{levelname:^8s}] {message}"
            super().__init__(fmt=log_format, datefmt="%Y-%m-%d %H:%M:%S", style="{")
        else:
            log_format = "[{asctime}] [{levelname:^8s}] [{filename:>26s}:{lineno:<4d} - {funcName:<24s} ] {message}"
            super().__init__(fmt=log_format, style="{")

    def format(self, record: LogRecord):
        if isinstance(record.msg, str) and record.levelname in COLORS:
            record.msg = COLORS[record.levelname] + record.msg + RESET

        # noinspection StrFormat
        return super().format(record)

Bases: Logger

The MinioManagerLogger is a custom Logger that implements our MinioManagerFilter and MinioManagerFormatter.

Source code in minio_manager/classes/logging_config.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class MinioManagerLogger(Logger):
    """
    The MinioManagerLogger is a custom Logger that implements our MinioManagerFilter and MinioManagerFormatter.
    """

    def __init__(self, name: str, level: str):
        super().__init__(name)
        if level == "INFO":
            self.setLevel(INFO)
        else:
            self.setLevel(DEBUG)

        handler = StreamHandler()
        formatter = MinioManagerFormatter(self.level)
        this_filter = MinioManagerFilter()
        handler.setFormatter(formatter)
        handler.addFilter(this_filter)
        self.addHandler(handler)

The McWrapper is responsible for executing mc commands.

To be replaced with the new functions in the updated MinioAdmin library.

Source code in minio_manager/classes/mc_wrapper.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class McWrapper:
    """
    The McWrapper is responsible for executing mc commands.

    To be replaced with the new functions in the updated MinioAdmin library.
    """

    def __init__(self, timeout=60):
        logger.debug("Initialising McWrapper")
        self.timeout = timeout
        self.mc_config_path = TemporaryDirectory(prefix="mm.mc.")
        self.mc = self.find_mc_command()
        self.configure(
            endpoint=settings.s3_endpoint,
            access_key=controller_user.access_key,
            secret_key=controller_user.secret_key,
            secure=settings.s3_endpoint_secure,
        )
        logger.debug("McWrapper initialised")

    def _run(self, args: list, multiline=False) -> list[dict] | dict:
        """Execute mc command and return JSON output."""
        logger.debug(f"Running: {self.mc} --config-dir {self.mc_config_path.name} --json {' '.join(args)}")
        proc = subprocess.run(
            [self.mc, "--config-dir", self.mc_config_path.name, "--json", *args],  # noqa: S603
            capture_output=True,
            timeout=self.timeout,
            text=True,
        )
        if not proc.stdout:
            return [] if multiline else {}
        if multiline:
            return [json.loads(line) for line in proc.stdout.splitlines()]
        return json.loads(proc.stdout)

    @staticmethod
    def find_mc_command() -> Path:
        """Configure the path to the mc command, as it may be named 'mcli' on some systems."""
        mc = shutil.which("mc")
        if not mc:
            mc = shutil.which("mcli")
        return Path(mc)

    def configure(self, endpoint: str, access_key: str, secret_key: str, secure: bool):
        """Ensure the proper alias is configured for the cluster."""
        logger.info("Configuring 'mc'...")
        url = f"https://{endpoint}" if secure else f"http://{endpoint}"
        alias_set_resp = self._run(["alias", "set", settings.cluster_name, url, access_key, secret_key])
        if alias_set_resp.get("error"):
            error_details = alias_set_resp["error"]["cause"]["error"]
            try:
                raise_specific_error(error_details["Code"], error_details["Message"])
            except AttributeError as ae:
                logger.exception("Unknown error!")
                raise MinioManagerBaseError(alias_set_resp["error"]["cause"]["message"]) from ae

        cluster_ready = self._run(["ready", settings.cluster_name])
        healthy = cluster_ready.get("healthy")
        if healthy:
            # Cluster is configured & available
            return

        if cluster_ready.get("error"):
            # A connection error occurred
            raise ConnectionError(cluster_ready["error"])

    def _service_account_run(self, cmd: str, args: list) -> list[dict] | dict:
        """
        mc admin user svcacct helper function, no need to specify the cluster name
        Args:
            cmd: str, the svcacct command
            args: list of arguments to the command

        Returns: list | dict

        """
        multiline = cmd in ["list", "ls"]
        resp = self._run(["admin", "user", "svcacct", cmd, settings.cluster_name, *args], multiline=multiline)
        resp_error = resp[0] if multiline else resp
        if "error" in resp_error:
            resp_error = resp_error["error"]
            error_details = resp_error["cause"]["error"]
            raise_specific_error(error_details["Code"], error_details["Message"])
        return resp

    def service_account_add(self, credentials: ServiceAccount) -> ServiceAccount:
        """
        mc admin user svcacct add alias-name 'username' --name "sa-test-key"

        Args:
            credentials (ServiceAccount): object containing at least the user-friendly name of the service account

        Returns: ServiceAccount with the access and secret keys added to it
        """
        # Create the service account in MinIO
        args = [settings.minio_controller_user, "--name", credentials.name]
        if credentials.description:
            args.extend(["--description", credentials.description])
        if credentials.access_key:
            args.extend(["--access-key", credentials.access_key])
        if credentials.secret_key:
            args.extend(["--secret-key", credentials.secret_key])
        resp = self._service_account_run("add", args)
        credentials.access_key = resp["accessKey"]
        credentials.secret_key = resp["secretKey"]
        return credentials

    def service_account_list(self, access_key: str) -> list[dict]:
        """mc admin user svcacct ls alias-name 'access_key'"""
        return self._service_account_run("ls", [access_key])

    def service_account_info(self, access_key: str) -> dict:
        """mc admin user svcacct info alias-name service-account-access-key"""
        return self._service_account_run("info", [access_key])

    def service_account_delete(self):
        """mc admin user svcacct rm alias-name service-account-access-key"""
        raise NotImplementedError

    def service_account_get_policy(self, access_key: str) -> dict:
        info = self.service_account_info(access_key)
        return info["policy"]

    def service_account_set_policy(self, access_key: str, policy_file: str):
        """mc admin user svcacct edit alias-name service-account-access-key --policy policy-file"""
        return self._service_account_run("edit", [access_key, "--policy", policy_file])

    def cleanup(self):
        """
        We want to clean up the mc config file before the process finishes as otherwise it can cause issues with
        subsequent runs for different environments.
        """
        if not self.mc_config_path.name.startswith("/tmp"):  # noqa: S108
            raise MinioManagerBaseError("CleanUpError", "Error during cleanup: temporary directory is not in /tmp")
        logger.debug(f"Deleting temporary mc config directory {self.mc_config_path.name}")
        self.mc_config_path.cleanup()

cleanup()

We want to clean up the mc config file before the process finishes as otherwise it can cause issues with subsequent runs for different environments.

Source code in minio_manager/classes/mc_wrapper.py
141
142
143
144
145
146
147
148
149
def cleanup(self):
    """
    We want to clean up the mc config file before the process finishes as otherwise it can cause issues with
    subsequent runs for different environments.
    """
    if not self.mc_config_path.name.startswith("/tmp"):  # noqa: S108
        raise MinioManagerBaseError("CleanUpError", "Error during cleanup: temporary directory is not in /tmp")
    logger.debug(f"Deleting temporary mc config directory {self.mc_config_path.name}")
    self.mc_config_path.cleanup()

configure(endpoint, access_key, secret_key, secure)

Ensure the proper alias is configured for the cluster.

Source code in minio_manager/classes/mc_wrapper.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def configure(self, endpoint: str, access_key: str, secret_key: str, secure: bool):
    """Ensure the proper alias is configured for the cluster."""
    logger.info("Configuring 'mc'...")
    url = f"https://{endpoint}" if secure else f"http://{endpoint}"
    alias_set_resp = self._run(["alias", "set", settings.cluster_name, url, access_key, secret_key])
    if alias_set_resp.get("error"):
        error_details = alias_set_resp["error"]["cause"]["error"]
        try:
            raise_specific_error(error_details["Code"], error_details["Message"])
        except AttributeError as ae:
            logger.exception("Unknown error!")
            raise MinioManagerBaseError(alias_set_resp["error"]["cause"]["message"]) from ae

    cluster_ready = self._run(["ready", settings.cluster_name])
    healthy = cluster_ready.get("healthy")
    if healthy:
        # Cluster is configured & available
        return

    if cluster_ready.get("error"):
        # A connection error occurred
        raise ConnectionError(cluster_ready["error"])

find_mc_command() staticmethod

Configure the path to the mc command, as it may be named 'mcli' on some systems.

Source code in minio_manager/classes/mc_wrapper.py
49
50
51
52
53
54
55
@staticmethod
def find_mc_command() -> Path:
    """Configure the path to the mc command, as it may be named 'mcli' on some systems."""
    mc = shutil.which("mc")
    if not mc:
        mc = shutil.which("mcli")
    return Path(mc)

service_account_add(credentials)

mc admin user svcacct add alias-name 'username' --name "sa-test-key"

Parameters:

Name Type Description Default
credentials ServiceAccount

object containing at least the user-friendly name of the service account

required

Returns: ServiceAccount with the access and secret keys added to it

Source code in minio_manager/classes/mc_wrapper.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def service_account_add(self, credentials: ServiceAccount) -> ServiceAccount:
    """
    mc admin user svcacct add alias-name 'username' --name "sa-test-key"

    Args:
        credentials (ServiceAccount): object containing at least the user-friendly name of the service account

    Returns: ServiceAccount with the access and secret keys added to it
    """
    # Create the service account in MinIO
    args = [settings.minio_controller_user, "--name", credentials.name]
    if credentials.description:
        args.extend(["--description", credentials.description])
    if credentials.access_key:
        args.extend(["--access-key", credentials.access_key])
    if credentials.secret_key:
        args.extend(["--secret-key", credentials.secret_key])
    resp = self._service_account_run("add", args)
    credentials.access_key = resp["accessKey"]
    credentials.secret_key = resp["secretKey"]
    return credentials

service_account_delete()

mc admin user svcacct rm alias-name service-account-access-key

Source code in minio_manager/classes/mc_wrapper.py
129
130
131
def service_account_delete(self):
    """mc admin user svcacct rm alias-name service-account-access-key"""
    raise NotImplementedError

service_account_info(access_key)

mc admin user svcacct info alias-name service-account-access-key

Source code in minio_manager/classes/mc_wrapper.py
125
126
127
def service_account_info(self, access_key: str) -> dict:
    """mc admin user svcacct info alias-name service-account-access-key"""
    return self._service_account_run("info", [access_key])

service_account_list(access_key)

mc admin user svcacct ls alias-name 'access_key'

Source code in minio_manager/classes/mc_wrapper.py
121
122
123
def service_account_list(self, access_key: str) -> list[dict]:
    """mc admin user svcacct ls alias-name 'access_key'"""
    return self._service_account_run("ls", [access_key])

service_account_set_policy(access_key, policy_file)

mc admin user svcacct edit alias-name service-account-access-key --policy policy-file

Source code in minio_manager/classes/mc_wrapper.py
137
138
139
def service_account_set_policy(self, access_key: str, policy_file: str):
    """mc admin user svcacct edit alias-name service-account-access-key --policy policy-file"""
    return self._service_account_run("edit", [access_key, "--policy", policy_file])

Bucket represents an S3 bucket.

name: The name of the bucket create_service_account: Whether to create a service account for the bucket (True or False) versioning: The versioning configuration for the bucket (Enabled or Suspended) lifecycle_config: The path to a lifecycle configuration JSON file for the bucket

Source code in minio_manager/classes/minio_resources.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Bucket:
    """
    Bucket represents an S3 bucket.

    name: The name of the bucket
    create_service_account: Whether to create a service account for the bucket (True or False)
    versioning: The versioning configuration for the bucket (Enabled or Suspended)
    lifecycle_config: The path to a lifecycle configuration JSON file for the bucket
    """

    def __init__(
        self,
        name: str,
        create_service_account: bool = settings.auto_create_service_account,
        versioning: VersioningConfig | None = None,
        lifecycle_config: LifecycleConfig | None = None,
    ):
        if len(name) > 63 or len(name) < 3:
            logger.error(f"Bucket '{name}' is {len(name)} characters long;")
            logger.error("Bucket names must be between 3 and 63 characters in length!")
            increment_error_count()

        self.name = name
        self.create_sa = create_service_account
        self.versioning = versioning
        self.lifecycle_config = lifecycle_config

BucketPolicy represents an S3 bucket policy.

bucket: The name of the bucket policy_file: The path to a JSON policy file

Source code in minio_manager/classes/minio_resources.py
44
45
46
47
48
49
50
51
52
53
54
55
class BucketPolicy:
    """
    BucketPolicy represents an S3 bucket policy.

    bucket: The name of the bucket
    policy_file: The path to a JSON policy file
    """

    # TODO: try loading the policy file in order to validate its contents
    def __init__(self, bucket: str, policy_file: str):
        self.bucket = bucket
        self.policy_file = policy_file

ServiceAccount represents a MinIO service account (or S3 access key).

name: The name of the service account description: The description of the service account access_key: The access key of the service account secret_key: The secret key of the service account policy: Optional custom policy for the service account policy_file: The path to a JSON policy file

Source code in minio_manager/classes/minio_resources.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class ServiceAccount:
    """
    ServiceAccount represents a MinIO service account (or S3 access key).

    name: The name of the service account
    description: The description of the service account
    access_key: The access key of the service account
    secret_key: The secret key of the service account
    policy: Optional custom policy for the service account
    policy_file: The path to a JSON policy file
    """

    policy = ClassVar[dict]
    policy_file: Path | None
    policy_generated = False

    def __init__(
        self,
        name: str,
        description: str = "",
        access_key: str | None = None,
        secret_key: str | None = None,
        policy: dict | None = None,
        policy_file: Path | str | None = None,
    ):
        if len(name) > 32:
            self.name = name[:32]
            self.description = name + " " + description
        else:
            self.name = name
            self.description = description

        self.full_name = name
        self.access_key = access_key
        self.secret_key = secret_key
        if policy_file:
            if isinstance(policy_file, Path):
                self.policy_file = policy_file
            else:
                self.policy_file = Path(policy_file)
        else:
            self.policy_file = None
        if policy:
            self.policy = policy
        elif self.policy_file:
            try:
                self.policy = read_json(self.policy_file)
            except FileNotFoundError:
                logger.error(f"Policy file '{self.policy_file}' for service account '{name}' not found!")
                increment_error_count()

    def generate_service_account_policy(self):
        """
        Generate a policy for a service account that gives access to a bucket with the same name as the service account.
        """
        if settings.service_account_policy_base_file:
            with Path(settings.service_account_policy_base_file).open() as base:
                base_policy = base.read()
        else:
            from minio_manager.resources.policies import service_account_policy_base

            base_policy = json.dumps(service_account_policy_base)

        temp_file = NamedTemporaryFile(prefix=self.full_name, suffix=".json", delete=False)
        with temp_file as out:
            new_content = base_policy.replace("BUCKET_NAME_REPLACE_ME", self.full_name)
            out.write(new_content.encode("utf-8"))

        self.policy = json.loads(new_content)
        self.policy_file = Path(temp_file.name)
        self.policy_generated = True

generate_service_account_policy()

Generate a policy for a service account that gives access to a bucket with the same name as the service account.

Source code in minio_manager/classes/minio_resources.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def generate_service_account_policy(self):
    """
    Generate a policy for a service account that gives access to a bucket with the same name as the service account.
    """
    if settings.service_account_policy_base_file:
        with Path(settings.service_account_policy_base_file).open() as base:
            base_policy = base.read()
    else:
        from minio_manager.resources.policies import service_account_policy_base

        base_policy = json.dumps(service_account_policy_base)

    temp_file = NamedTemporaryFile(prefix=self.full_name, suffix=".json", delete=False)
    with temp_file as out:
        new_content = base_policy.replace("BUCKET_NAME_REPLACE_ME", self.full_name)
        out.write(new_content.encode("utf-8"))

    self.policy = json.loads(new_content)
    self.policy_file = Path(temp_file.name)
    self.policy_generated = True

IamPolicy represents an S3 IAM policy.

name: The name of the policy policy_file: The path to a JSON policy file

Source code in minio_manager/classes/minio_resources.py
131
132
133
134
135
136
137
138
139
140
141
class IamPolicy:
    """
    IamPolicy represents an S3 IAM policy.

    name: The name of the policy
    policy_file: The path to a JSON policy file
    """

    def __init__(self, name: str, policy_file: str):
        self.name = name
        self.policy_file = policy_file

IamPolicyAttachment represents an S3 IAM policy attachment.

username: The name of the user to attach the policies to policies: A list of policies to attach to the user

Source code in minio_manager/classes/minio_resources.py
144
145
146
147
148
149
150
151
152
153
154
class IamPolicyAttachment:
    """
    IamPolicyAttachment represents an S3 IAM policy attachment.

    username: The name of the user to attach the policies to
    policies: A list of policies to attach to the user
    """

    def __init__(self, username: str, policies: list):
        self.username = username
        self.policies = policies

SecretManager is responsible for managing credentials

Source code in minio_manager/classes/secrets.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
class SecretManager:
    """SecretManager is responsible for managing credentials"""

    def __init__(self):
        logger.info("Loading secret backend...")
        self.backend_dirty = False
        self.backend_type = settings.secret_backend_type
        self.backend_bucket = settings.secret_backend_s3_bucket
        self.backend_secure = settings.s3_endpoint_secure
        self.backend_filename = None
        self.keepass_temp_file = None
        self.keepass_group = None
        self.backend_s3 = self.setup_backend_s3()
        self.backend = self.setup_backend()
        logger.debug(f"Secret backend initialised with {self.backend_type}")

    def setup_backend_s3(self):
        endpoint = settings.s3_endpoint
        access_key = settings.secret_backend_s3_access_key
        secret_key = settings.secret_backend_s3_secret_key
        logger.debug(f"Setting up secret bucket {self.backend_bucket}")
        s3 = Minio(endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=self.backend_secure)
        try:
            s3.bucket_exists(self.backend_bucket)
        except S3Error as s3e:
            if s3e.code == "SignatureDoesNotMatch":
                logger.critical("Invalid secret key provided for the secret backend bucket user.")
            if s3e.code == "InvalidAccessKeyId":
                logger.critical("Invalid access key ID provided for the secret backend bucket user.")
            if s3e.code == "AccessDenied":
                logger.critical(
                    "Access denied for the secret backend bucket user. Does the bucket exist, and does the "
                    "user have the correct permissions to the bucket?"
                )
            sys.exit(20)
        return s3

    def setup_backend(self):
        """We dynamically configure the backend depending on the given backend type."""
        logger.debug(f"Configuring SecretManager with backend {self.backend_type}")
        method_name = f"retrieve_{self.backend_type}_backend"
        method = getattr(self, method_name)
        return method()

    def get_credentials(self, name: str, required: bool = False) -> ServiceAccount:
        """Get a password from the configured secret backend.

        Args:
            name (str): the name of the password entry
            required (bool): whether the credentials must exist

        Returns: MinioCredentials
        """
        method_name = f"{self.backend_type}_get_credentials"
        method = getattr(self, method_name)
        return method(name, required)

    def set_password(self, credentials: ServiceAccount):
        method_name = f"{self.backend_type}_set_password"
        method = getattr(self, method_name)
        self.backend_dirty = True
        return method(credentials)

    def retrieve_dummy_backend(self, config):
        raise NotImplementedError

    def dummy_get_credentials(self, name):
        raise NotImplementedError

    def dummy_set_password(self, credentials: ServiceAccount):
        raise NotImplementedError

    def retrieve_keepass_backend(self) -> PyKeePass:
        """Back-end implementation for the keepass backend.
        Two-step process:
            - first we retrieve the kdbx file from the S3 bucket
            - then we configure the PyKeePass backend

        Returns: PyKeePass object, with the kdbx file loaded

        """
        self.backend_filename = settings.keepass_filename
        tmp_file = NamedTemporaryFile(prefix=f"mm.{self.backend_filename}.", delete=False)
        self.keepass_temp_file = tmp_file
        try:
            response = self.backend_s3.get_object(self.backend_bucket, self.backend_filename)
            with tmp_file as f:
                logger.debug(f"Writing kdbx file to temp file {tmp_file.name}")
                f.write(response.data)
        except S3Error as s3e:
            logger.debug(s3e)
            logger.critical(
                f"Unable to retrieve {self.backend_filename} from {self.backend_bucket}!\n"
                "Do the required bucket and kdbx file exist, and does the user have the correct "
                "policies assigned?"
            )
            sys.exit(21)
        finally:
            response.close()
            response.release_conn()

        kp_pass = settings.keepass_password
        logger.debug("Opening keepass database")
        try:
            kp = PyKeePass(self.keepass_temp_file.name, password=kp_pass)
        except CredentialsError:
            logger.critical("Invalid credentials for Keepass database.")
            sys.exit(22)
        # noinspection PyTypeChecker
        self.keepass_group = kp.find_groups(path=["s3", settings.cluster_name])
        if not self.keepass_group:
            logger.critical("Required group not found in Keepass! See documentation for requirements.")
            sys.exit(23)
        logger.debug("Keepass configured as secret backend")
        return kp

    def keepass_get_credentials(self, name: str, required: bool) -> ServiceAccount:
        """Get a password from the configured Keepass database.

        Args:
            name (str): the name of the password entry
            required (bool): if the entry must exist

        Returns:
            ServiceAccount
        """
        logger.debug(f"Finding Keepass entry for {name}")
        entry = self.backend.find_entries(title=name, group=self.keepass_group, first=True)

        try:
            credentials = ServiceAccount(name=name, access_key=entry.username, secret_key=entry.password)
            logger.debug(f"Found access key {credentials.access_key}")
        except AttributeError as ae:
            if not ae.obj:
                if required:
                    logger.critical(f"Required entry for {name} not found!")
                    sys.exit(24)
                return ServiceAccount(name=name)
            logger.critical(f"Unhandled exception: {ae}")
        else:
            return credentials

    def keepass_set_password(self, credentials: ServiceAccount):
        """Set the password for the given credentials.

        Args:
            credentials (ServiceAccount): the credentials to set
        """
        logger.debug(f"Creating Keepass entry '{credentials.name}' with access key '{credentials.access_key}'")
        self.backend.add_entry(
            destination_group=self.keepass_group,
            title=credentials.name,
            username=credentials.access_key,
            password=credentials.secret_key,
        )

    def cleanup(self):
        if not self.backend_dirty:
            if self.keepass_temp_file:
                self.keepass_temp_file.close()
                Path(self.keepass_temp_file.name).unlink(missing_ok=True)
            return

        # If we have dirty back-ends, we want to ensure they are saved before exiting.
        if self.backend_type == "keepass":
            # The PyKeePass save() function can take some time. So we want to run it once when the application is
            # exiting, not every time after creating or updating an entry.
            # After saving, upload the updated file to the S3 bucket and clean up the temp file.
            if isinstance(self.backend, PyKeePass):
                t_filename = self.keepass_temp_file.name  # temp file name
                s_bucket_name = self.backend_bucket  # bucket name
                s_filename = self.backend_filename  # file name in bucket
                logger.info(f"Saving modified {s_filename} and uploading back to bucket {s_bucket_name}.")
                logger.debug(f"Saving temp file {t_filename}")
                self.backend.save()
                logger.debug(f"Uploading {t_filename} to bucket {s_bucket_name}")
                self.backend_s3.fput_object(s_bucket_name, s_filename, t_filename)
                logger.info(f"Successfully saved modified {s_filename}.")
            logger.debug(f"Cleaning up {self.keepass_temp_file.name}")
            self.keepass_temp_file.close()
            Path(self.keepass_temp_file.name).unlink(missing_ok=True)

get_credentials(name, required=False)

Get a password from the configured secret backend.

Parameters:

Name Type Description Default
name str

the name of the password entry

required
required bool

whether the credentials must exist

False

Returns: MinioCredentials

Source code in minio_manager/classes/secrets.py
60
61
62
63
64
65
66
67
68
69
70
71
def get_credentials(self, name: str, required: bool = False) -> ServiceAccount:
    """Get a password from the configured secret backend.

    Args:
        name (str): the name of the password entry
        required (bool): whether the credentials must exist

    Returns: MinioCredentials
    """
    method_name = f"{self.backend_type}_get_credentials"
    method = getattr(self, method_name)
    return method(name, required)

keepass_get_credentials(name, required)

Get a password from the configured Keepass database.

Parameters:

Name Type Description Default
name str

the name of the password entry

required
required bool

if the entry must exist

required

Returns:

Type Description
ServiceAccount

ServiceAccount

Source code in minio_manager/classes/secrets.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def keepass_get_credentials(self, name: str, required: bool) -> ServiceAccount:
    """Get a password from the configured Keepass database.

    Args:
        name (str): the name of the password entry
        required (bool): if the entry must exist

    Returns:
        ServiceAccount
    """
    logger.debug(f"Finding Keepass entry for {name}")
    entry = self.backend.find_entries(title=name, group=self.keepass_group, first=True)

    try:
        credentials = ServiceAccount(name=name, access_key=entry.username, secret_key=entry.password)
        logger.debug(f"Found access key {credentials.access_key}")
    except AttributeError as ae:
        if not ae.obj:
            if required:
                logger.critical(f"Required entry for {name} not found!")
                sys.exit(24)
            return ServiceAccount(name=name)
        logger.critical(f"Unhandled exception: {ae}")
    else:
        return credentials

keepass_set_password(credentials)

Set the password for the given credentials.

Parameters:

Name Type Description Default
credentials ServiceAccount

the credentials to set

required
Source code in minio_manager/classes/secrets.py
158
159
160
161
162
163
164
165
166
167
168
169
170
def keepass_set_password(self, credentials: ServiceAccount):
    """Set the password for the given credentials.

    Args:
        credentials (ServiceAccount): the credentials to set
    """
    logger.debug(f"Creating Keepass entry '{credentials.name}' with access key '{credentials.access_key}'")
    self.backend.add_entry(
        destination_group=self.keepass_group,
        title=credentials.name,
        username=credentials.access_key,
        password=credentials.secret_key,
    )

retrieve_keepass_backend()

Back-end implementation for the keepass backend. Two-step process: - first we retrieve the kdbx file from the S3 bucket - then we configure the PyKeePass backend

Returns: PyKeePass object, with the kdbx file loaded

Source code in minio_manager/classes/secrets.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def retrieve_keepass_backend(self) -> PyKeePass:
    """Back-end implementation for the keepass backend.
    Two-step process:
        - first we retrieve the kdbx file from the S3 bucket
        - then we configure the PyKeePass backend

    Returns: PyKeePass object, with the kdbx file loaded

    """
    self.backend_filename = settings.keepass_filename
    tmp_file = NamedTemporaryFile(prefix=f"mm.{self.backend_filename}.", delete=False)
    self.keepass_temp_file = tmp_file
    try:
        response = self.backend_s3.get_object(self.backend_bucket, self.backend_filename)
        with tmp_file as f:
            logger.debug(f"Writing kdbx file to temp file {tmp_file.name}")
            f.write(response.data)
    except S3Error as s3e:
        logger.debug(s3e)
        logger.critical(
            f"Unable to retrieve {self.backend_filename} from {self.backend_bucket}!\n"
            "Do the required bucket and kdbx file exist, and does the user have the correct "
            "policies assigned?"
        )
        sys.exit(21)
    finally:
        response.close()
        response.release_conn()

    kp_pass = settings.keepass_password
    logger.debug("Opening keepass database")
    try:
        kp = PyKeePass(self.keepass_temp_file.name, password=kp_pass)
    except CredentialsError:
        logger.critical("Invalid credentials for Keepass database.")
        sys.exit(22)
    # noinspection PyTypeChecker
    self.keepass_group = kp.find_groups(path=["s3", settings.cluster_name])
    if not self.keepass_group:
        logger.critical("Required group not found in Keepass! See documentation for requirements.")
        sys.exit(23)
    logger.debug("Keepass configured as secret backend")
    return kp

setup_backend()

We dynamically configure the backend depending on the given backend type.

Source code in minio_manager/classes/secrets.py
53
54
55
56
57
58
def setup_backend(self):
    """We dynamically configure the backend depending on the given backend type."""
    logger.debug(f"Configuring SecretManager with backend {self.backend_type}")
    method_name = f"retrieve_{self.backend_type}_backend"
    method = getattr(self, method_name)
    return method()

Bases: BaseSettings

The Settings class is responsible for loading the settings from environment variables and the dotenv file, and making them available to the rest of the application.

Source code in minio_manager/classes/settings.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class Settings(BaseSettings):
    """
    The Settings class is responsible for loading the settings from environment variables and the dotenv file, and
    making them available to the rest of the application.
    """

    model_config = SettingsConfigDict(
        env_prefix="MINIO_MANAGER_", env_file="config.env", env_file_encoding="utf-8", extra="ignore"
    )

    log_level: str = "INFO"

    cluster_name: str
    s3_endpoint: str
    s3_endpoint_secure: bool = True

    minio_controller_user: str
    cluster_resources_file: str = "resources.yaml"

    secret_backend_type: str
    secret_backend_s3_bucket: str = "minio-manager-secrets"
    secret_backend_s3_access_key: str
    secret_backend_s3_secret_key: str

    keepass_filename: str = "secrets.kdbx"
    keepass_password: str | None = None

    auto_create_service_account: bool = True
    allowed_bucket_prefixes: tuple[str, ...] = ()
    default_bucket_versioning: str = "Suspended"
    default_lifecycle_policy_file: str | None = None
    service_account_policy_base_file: str = ""

    @classmethod
    def settings_customise_sources(
        cls,
        settings_cls: type[BaseSettings],
        init_settings: PydanticBaseSettingsSource,
        env_settings: CustomEnvSettingsSource,
        dotenv_settings: PydanticBaseSettingsSource,
        file_secret_settings: PydanticBaseSettingsSource,
    ) -> tuple[PydanticBaseSettingsSource, ...]:
        return (
            CustomEnvSettingsSource(settings_cls),
            CustomDotEnvSettingsSource(settings_cls),
            init_settings,
        )

ControllerUser represents the controller user of our application.

name: The name of the controller user access_key: The access key of the controller user secret_key: The secret key of the controller user

Source code in minio_manager/classes/controller_user.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ControllerUser:
    """
    ControllerUser represents the controller user of our application.

    name: The name of the controller user
    access_key: The access key of the controller user
    secret_key: The secret key of the controller user
    """

    name: str
    access_key: str
    secret_key: str

    def __init__(self, name: str):
        self.name = name
        credentials = secrets.get_credentials(name)
        self.access_key = credentials.access_key
        self.secret_key = credentials.secret_key