cloud-init/backport-Set-ownership-for-new-folders-in-Write-Files-Module-.patch
2023-08-26 14:46:22 +08:00

166 lines
5.9 KiB
Diff

From 15a6e0868097ec8a6ef97b9fde59a9486270fc37 Mon Sep 17 00:00:00 2001
From: Jack <jack4zhang@gmail.com>
Date: Tue, 21 Feb 2023 23:39:43 +0800
Subject: [PATCH] Set ownership for new folders in Write Files Module
(#1980)
Reference:https://github.com/canonical/cloud-init/commit/15a6e0868097ec8a6ef97b9fde59a9486270fc37
Conflict:(1)cloudinit/config/schemas/schema-cloud-config-v1.json not
change.
(2)tools/.github-cla-signers not change.
(3)tests/integration_tests/modules/test_write_files.py not change.
Integration tests are current Ubuntu-only.
https://github.com/canonical/cloud-init/issues/4290#issuecomment-1660200921
The parent directory would be created automatically if it does not
exist. But the ownership of newly-created parent
directory would always be root.
With this change, it would be set the same as .
LP: #1990513
---
cloudinit/config/cc_write_files.py | 4 +++-
cloudinit/util.py | 38 ++++++++++++++++++++++++++++--
tests/unittests/test_util.py | 20 ++++++++++++++++
3 files changed, 59 insertions(+), 3 deletions(-)
diff --git a/cloudinit/config/cc_write_files.py b/cloudinit/config/cc_write_files.py
index b1678b8..6edaa85 100644
--- a/cloudinit/config/cc_write_files.py
+++ b/cloudinit/config/cc_write_files.py
@@ -241,7 +241,9 @@ def write_files(name, files):
(u, g) = util.extract_usergroup(f_info.get('owner', DEFAULT_OWNER))
perms = decode_perms(f_info.get('permissions'), DEFAULT_PERMS)
omode = 'ab' if util.get_cfg_option_bool(f_info, 'append') else 'wb'
- util.write_file(path, contents, omode=omode, mode=perms)
+ util.write_file(
+ path, contents, omode=omode, mode=perms, user=u, group=g
+ )
util.chownbyname(path, u, g)
diff --git a/cloudinit/util.py b/cloudinit/util.py
index 2b8adf3..d4a6eed 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -35,6 +35,7 @@ from base64 import b64decode, b64encode
from errno import ENOENT
from collections import namedtuple
from functools import lru_cache, total_ordering
+from pathlib import Path
from urllib import parse
from typing import List
@@ -1648,12 +1649,41 @@ def json_dumps(data):
raise
-def ensure_dir(path, mode=None):
+def get_non_exist_parent_dir(path):
+ """Get the last directory in a path that does not exist.
+
+ Example: when path=/usr/a/b and /usr/a does not exis but /usr does,
+ return /usr/a
+ """
+ p_path = os.path.dirname(path)
+ # Check if parent directory of path is root
+ if p_path == os.path.dirname(p_path):
+ return path
+ else:
+ if os.path.isdir(p_path):
+ return path
+ else:
+ return get_non_exist_parent_dir(p_path)
+
+
+def ensure_dir(path, mode=None, user=None, group=None):
if not os.path.isdir(path):
+ # Get non existed parent dir first before they are created.
+ non_existed_parent_dir = get_non_exist_parent_dir(path)
# Make the dir and adjust the mode
with SeLinuxGuard(os.path.dirname(path), recursive=True):
os.makedirs(path)
chmod(path, mode)
+ # Change the ownership
+ if user or group:
+ chownbyname(non_existed_parent_dir, user, group)
+ # if path=/usr/a/b/c and non_existed_parent_dir=/usr,
+ # then sub_relative_dir=PosixPath('a/b/c')
+ sub_relative_dir = Path(path.split(non_existed_parent_dir)[1][1:])
+ sub_path = Path(non_existed_parent_dir)
+ for part in sub_relative_dir.parts:
+ sub_path = sub_path.joinpath(part)
+ chownbyname(sub_path, user, group)
else:
# Just adjust the mode
chmod(path, mode)
@@ -1972,6 +2002,8 @@ def write_file(
preserve_mode=False,
*,
ensure_dir_exists=True,
+ user=None,
+ group=None,
):
"""
Writes a file with the given content and sets the file mode as specified.
@@ -1986,6 +2018,8 @@ def write_file(
@param ensure_dir_exists: If True (the default), ensure that the directory
containing `filename` exists before writing to
the file.
+ @param user: The user to set on the file.
+ @param group: The group to set on the file.
"""
if preserve_mode:
@@ -1995,7 +2029,7 @@ def write_file(
pass
if ensure_dir_exists:
- ensure_dir(os.path.dirname(filename))
+ ensure_dir(os.path.dirname(filename), user=user, group=group)
if 'b' in omode.lower():
content = encode_text(content)
write_type = 'bytes'
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 3fa5059..2ab3bad 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -10,6 +10,7 @@ import tempfile
import pytest
import yaml
from unittest import mock
+from pathlib import Path
from cloudinit import subp
from cloudinit import importer, util
@@ -99,6 +100,25 @@ class TestWriteFile(helpers.TestCase):
self.assertTrue(os.path.isdir(dirname))
self.assertTrue(os.path.isfile(path))
+ def test_dir_ownership(self):
+ """Verifiy that directories is created with appropriate ownership."""
+ dirname = os.path.join(self.tmp, "subdir", "subdir2")
+ path = os.path.join(dirname, "NewFile.txt")
+ contents = "Hey there"
+ user = "foo"
+ group = "foo"
+
+ with mock.patch.object(
+ util, "chownbyname", return_value=None
+ ) as mockobj:
+ util.write_file(path, contents, user=user, group=group)
+
+ calls = [
+ mock.call(os.path.join(self.tmp, "subdir"), user, group),
+ mock.call(Path(dirname), user, group),
+ ]
+ mockobj.assert_has_calls(calls, any_order=False)
+
def test_dir_is_not_created_if_ensure_dir_false(self):
"""Verify directories are not created if ensure_dir_exists is False."""
dirname = os.path.join(self.tmp, "subdir")
--
2.27.0