From 15a6e0868097ec8a6ef97b9fde59a9486270fc37 Mon Sep 17 00:00:00 2001 From: Jack Date: Tue, 21 Feb 2023 23:39:43 +0800 Subject: [PATCH] Set ownership for new folders in Write Files Module (#1980) Reference:https://github.com/canonical/cloud-init/commit/15a6e0868097ec8a6ef97b9fde59a9486270fc37 Conflict:(1)cloudinit/config/schemas/schema-cloud-config-v1.json not change. (2)tools/.github-cla-signers not change. (3)tests/integration_tests/modules/test_write_files.py not change. Integration tests are current Ubuntu-only. https://github.com/canonical/cloud-init/issues/4290#issuecomment-1660200921 The parent directory would be created automatically if it does not exist. But the ownership of newly-created parent directory would always be root. With this change, it would be set the same as . LP: #1990513 --- cloudinit/config/cc_write_files.py | 4 +++- cloudinit/util.py | 38 ++++++++++++++++++++++++++++-- tests/unittests/test_util.py | 20 ++++++++++++++++ 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/cloudinit/config/cc_write_files.py b/cloudinit/config/cc_write_files.py index b1678b8..6edaa85 100644 --- a/cloudinit/config/cc_write_files.py +++ b/cloudinit/config/cc_write_files.py @@ -241,7 +241,9 @@ def write_files(name, files): (u, g) = util.extract_usergroup(f_info.get('owner', DEFAULT_OWNER)) perms = decode_perms(f_info.get('permissions'), DEFAULT_PERMS) omode = 'ab' if util.get_cfg_option_bool(f_info, 'append') else 'wb' - util.write_file(path, contents, omode=omode, mode=perms) + util.write_file( + path, contents, omode=omode, mode=perms, user=u, group=g + ) util.chownbyname(path, u, g) diff --git a/cloudinit/util.py b/cloudinit/util.py index 2b8adf3..d4a6eed 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -35,6 +35,7 @@ from base64 import b64decode, b64encode from errno import ENOENT from collections import namedtuple from functools import lru_cache, total_ordering +from pathlib import Path from urllib import parse from typing import List @@ -1648,12 +1649,41 @@ def json_dumps(data): raise -def ensure_dir(path, mode=None): +def get_non_exist_parent_dir(path): + """Get the last directory in a path that does not exist. + + Example: when path=/usr/a/b and /usr/a does not exis but /usr does, + return /usr/a + """ + p_path = os.path.dirname(path) + # Check if parent directory of path is root + if p_path == os.path.dirname(p_path): + return path + else: + if os.path.isdir(p_path): + return path + else: + return get_non_exist_parent_dir(p_path) + + +def ensure_dir(path, mode=None, user=None, group=None): if not os.path.isdir(path): + # Get non existed parent dir first before they are created. + non_existed_parent_dir = get_non_exist_parent_dir(path) # Make the dir and adjust the mode with SeLinuxGuard(os.path.dirname(path), recursive=True): os.makedirs(path) chmod(path, mode) + # Change the ownership + if user or group: + chownbyname(non_existed_parent_dir, user, group) + # if path=/usr/a/b/c and non_existed_parent_dir=/usr, + # then sub_relative_dir=PosixPath('a/b/c') + sub_relative_dir = Path(path.split(non_existed_parent_dir)[1][1:]) + sub_path = Path(non_existed_parent_dir) + for part in sub_relative_dir.parts: + sub_path = sub_path.joinpath(part) + chownbyname(sub_path, user, group) else: # Just adjust the mode chmod(path, mode) @@ -1972,6 +2002,8 @@ def write_file( preserve_mode=False, *, ensure_dir_exists=True, + user=None, + group=None, ): """ Writes a file with the given content and sets the file mode as specified. @@ -1986,6 +2018,8 @@ def write_file( @param ensure_dir_exists: If True (the default), ensure that the directory containing `filename` exists before writing to the file. + @param user: The user to set on the file. + @param group: The group to set on the file. """ if preserve_mode: @@ -1995,7 +2029,7 @@ def write_file( pass if ensure_dir_exists: - ensure_dir(os.path.dirname(filename)) + ensure_dir(os.path.dirname(filename), user=user, group=group) if 'b' in omode.lower(): content = encode_text(content) write_type = 'bytes' diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 3fa5059..2ab3bad 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -10,6 +10,7 @@ import tempfile import pytest import yaml from unittest import mock +from pathlib import Path from cloudinit import subp from cloudinit import importer, util @@ -99,6 +100,25 @@ class TestWriteFile(helpers.TestCase): self.assertTrue(os.path.isdir(dirname)) self.assertTrue(os.path.isfile(path)) + def test_dir_ownership(self): + """Verifiy that directories is created with appropriate ownership.""" + dirname = os.path.join(self.tmp, "subdir", "subdir2") + path = os.path.join(dirname, "NewFile.txt") + contents = "Hey there" + user = "foo" + group = "foo" + + with mock.patch.object( + util, "chownbyname", return_value=None + ) as mockobj: + util.write_file(path, contents, user=user, group=group) + + calls = [ + mock.call(os.path.join(self.tmp, "subdir"), user, group), + mock.call(Path(dirname), user, group), + ] + mockobj.assert_has_calls(calls, any_order=False) + def test_dir_is_not_created_if_ensure_dir_false(self): """Verify directories are not created if ensure_dir_exists is False.""" dirname = os.path.join(self.tmp, "subdir") -- 2.27.0