test: sort imports and use "new" string formatting
Followed PEP8 and PEP3101 rules (#8079) Imports re-ordered by Alphabetical Standarts for following PEP8 Old type string formattings (" example %s " % exampleVar ) re-writed as new type string formattings ( " example {} ".format(exampleVar) ) for following PEP3101
This commit is contained in:
parent
ec79af69a1
commit
278391c2fe
|
@ -33,13 +33,13 @@
|
||||||
|
|
||||||
import errno
|
import errno
|
||||||
import os
|
import os
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import unittest
|
|
||||||
import tempfile
|
|
||||||
import subprocess
|
|
||||||
import shutil
|
import shutil
|
||||||
import socket
|
import socket
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
|
||||||
HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
|
HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
|
||||||
|
|
||||||
|
@ -93,8 +93,8 @@ class NetworkdTestingUtilities:
|
||||||
|
|
||||||
def write_network_dropin(self, unit_name, dropin_name, contents):
|
def write_network_dropin(self, unit_name, dropin_name, contents):
|
||||||
"""Write a network unit drop-in, and queue it to be removed."""
|
"""Write a network unit drop-in, and queue it to be removed."""
|
||||||
dropin_dir = os.path.join(NETWORK_UNITDIR, "%s.d" % unit_name)
|
dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
|
||||||
dropin_path = os.path.join(dropin_dir, "%s.conf" % dropin_name)
|
dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
|
||||||
|
|
||||||
os.makedirs(dropin_dir, exist_ok=True)
|
os.makedirs(dropin_dir, exist_ok=True)
|
||||||
self.addCleanup(os.rmdir, dropin_dir)
|
self.addCleanup(os.rmdir, dropin_dir)
|
||||||
|
@ -130,7 +130,7 @@ class NetworkdTestingUtilities:
|
||||||
|
|
||||||
# Wait for the requested interfaces, but don't fail for them.
|
# Wait for the requested interfaces, but don't fail for them.
|
||||||
subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
|
subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
|
||||||
['--interface=%s' % iface for iface in kwargs])
|
['--interface={}'.format(iface) for iface in kwargs])
|
||||||
|
|
||||||
# Validate each link state found in the networkctl output.
|
# Validate each link state found in the networkctl output.
|
||||||
out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
|
out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
|
||||||
|
@ -142,13 +142,12 @@ class NetworkdTestingUtilities:
|
||||||
actual = fields[-1]
|
actual = fields[-1]
|
||||||
if (actual != expected and
|
if (actual != expected and
|
||||||
not (expected == 'managed' and actual != 'unmanaged')):
|
not (expected == 'managed' and actual != 'unmanaged')):
|
||||||
self.fail("Link %s expects state %s, found %s" %
|
self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
|
||||||
(iface, expected, actual))
|
|
||||||
interfaces.remove(iface)
|
interfaces.remove(iface)
|
||||||
|
|
||||||
# Ensure that all requested interfaces have been covered.
|
# Ensure that all requested interfaces have been covered.
|
||||||
if interfaces:
|
if interfaces:
|
||||||
self.fail("Missing links in status output: %s" % interfaces)
|
self.fail("Missing links in status output: {}".format(interfaces))
|
||||||
|
|
||||||
|
|
||||||
class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
|
class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
|
||||||
|
@ -257,7 +256,7 @@ class ClientTestBase(NetworkdTestingUtilities):
|
||||||
def show_journal(self, unit):
|
def show_journal(self, unit):
|
||||||
'''Show journal of given unit since start of the test'''
|
'''Show journal of given unit since start of the test'''
|
||||||
|
|
||||||
print('---- %s ----' % unit)
|
print('---- {} ----'.format(unit))
|
||||||
subprocess.check_output(['journalctl', '--sync'])
|
subprocess.check_output(['journalctl', '--sync'])
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
|
subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
|
||||||
|
@ -287,10 +286,10 @@ class ClientTestBase(NetworkdTestingUtilities):
|
||||||
raise
|
raise
|
||||||
self.write_network(self.config, '''\
|
self.write_network(self.config, '''\
|
||||||
[Match]
|
[Match]
|
||||||
Name=%s
|
Name={}
|
||||||
[Network]
|
[Network]
|
||||||
DHCP=%s
|
DHCP={}
|
||||||
%s''' % (self.iface, dhcp_mode, extra_opts))
|
{}'''.format(self.iface, dhcp_mode, extra_opts))
|
||||||
|
|
||||||
if coldplug:
|
if coldplug:
|
||||||
# create interface first, then start networkd
|
# create interface first, then start networkd
|
||||||
|
@ -335,8 +334,8 @@ DHCP=%s
|
||||||
|
|
||||||
# check networkctl state
|
# check networkctl state
|
||||||
out = subprocess.check_output(['networkctl'])
|
out = subprocess.check_output(['networkctl'])
|
||||||
self.assertRegex(out, (r'%s\s+ether\s+[a-z-]+\s+unmanaged' % self.if_router).encode())
|
self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
|
||||||
self.assertRegex(out, (r'%s\s+ether\s+routable\s+configured' % self.iface).encode())
|
self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
|
||||||
|
|
||||||
out = subprocess.check_output(['networkctl', 'status', self.iface])
|
out = subprocess.check_output(['networkctl', 'status', self.iface])
|
||||||
self.assertRegex(out, br'Type:\s+ether')
|
self.assertRegex(out, br'Type:\s+ether')
|
||||||
|
@ -352,11 +351,11 @@ DHCP=%s
|
||||||
except (AssertionError, subprocess.CalledProcessError):
|
except (AssertionError, subprocess.CalledProcessError):
|
||||||
# show networkd status, journal, and DHCP server log on failure
|
# show networkd status, journal, and DHCP server log on failure
|
||||||
with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
|
with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
|
||||||
print('\n---- %s ----\n%s' % (self.config, f.read()))
|
print('\n---- {} ----\n{}'.format(self.config, f.read()))
|
||||||
print('---- interface status ----')
|
print('---- interface status ----')
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
|
subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
|
||||||
print('---- networkctl status %s ----' % self.iface)
|
print('---- networkctl status {} ----'.format(self.iface))
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
subprocess.call(['networkctl', 'status', self.iface])
|
subprocess.call(['networkctl', 'status', self.iface])
|
||||||
self.show_journal('systemd-networkd.service')
|
self.show_journal('systemd-networkd.service')
|
||||||
|
@ -513,7 +512,7 @@ class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
|
||||||
'''Print DHCP server log for debugging failures'''
|
'''Print DHCP server log for debugging failures'''
|
||||||
|
|
||||||
with open(self.dnsmasq_log) as f:
|
with open(self.dnsmasq_log) as f:
|
||||||
sys.stdout.write('\n\n---- dnsmasq log ----\n%s\n------\n\n' % f.read())
|
sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
|
||||||
|
|
||||||
def test_resolved_domain_restricted_dns(self):
|
def test_resolved_domain_restricted_dns(self):
|
||||||
'''resolved: domain-restricted DNS servers'''
|
'''resolved: domain-restricted DNS servers'''
|
||||||
|
@ -523,10 +522,10 @@ class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
|
||||||
self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
|
self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
|
||||||
self.write_network('general.network', '''\
|
self.write_network('general.network', '''\
|
||||||
[Match]
|
[Match]
|
||||||
Name=%s
|
Name={}
|
||||||
[Network]
|
[Network]
|
||||||
DHCP=ipv4
|
DHCP=ipv4
|
||||||
IPv6AcceptRA=False''' % self.iface)
|
IPv6AcceptRA=False'''.format(self.iface))
|
||||||
|
|
||||||
# create second device/dnsmasq for a .company/.lab VPN interface
|
# create second device/dnsmasq for a .company/.lab VPN interface
|
||||||
# static IPs for simplicity
|
# static IPs for simplicity
|
||||||
|
@ -653,7 +652,7 @@ Domains= ~company ~lab''')
|
||||||
self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
|
self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
|
||||||
subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
|
subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
|
||||||
|
|
||||||
self.create_iface(dnsmasq_opts=['--dhcp-host=%s,192.168.5.210,testgreen' % self.iface_mac])
|
self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
|
||||||
self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
|
self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -670,7 +669,7 @@ Domains= ~company ~lab''')
|
||||||
sys.stdout.write('[retry %i] ' % retry)
|
sys.stdout.write('[retry %i] ' % retry)
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
else:
|
else:
|
||||||
self.fail('Transient hostname not found in hostnamectl:\n%s' % out.decode())
|
self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
|
||||||
# and also applied to the system
|
# and also applied to the system
|
||||||
self.assertEqual(socket.gethostname(), 'testgreen')
|
self.assertEqual(socket.gethostname(), 'testgreen')
|
||||||
except AssertionError:
|
except AssertionError:
|
||||||
|
@ -688,7 +687,7 @@ Domains= ~company ~lab''')
|
||||||
self.writeConfig('/etc/hostname', orig_hostname)
|
self.writeConfig('/etc/hostname', orig_hostname)
|
||||||
subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
|
subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
|
||||||
|
|
||||||
self.create_iface(dnsmasq_opts=['--dhcp-host=%s,192.168.5.210,testgreen' % self.iface_mac])
|
self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
|
||||||
self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
|
self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -942,9 +941,9 @@ class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
|
||||||
['addr', mac], ['addr', mac])
|
['addr', mac], ['addr', mac])
|
||||||
self.write_network('no-veth.network', """\
|
self.write_network('no-veth.network', """\
|
||||||
[Match]
|
[Match]
|
||||||
MACAddress=%s
|
MACAddress={}
|
||||||
Name=!nonexistent *peer*
|
Name=!nonexistent *peer*
|
||||||
[Network]""" % mac)
|
[Network]""".format(mac))
|
||||||
subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
|
subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
|
||||||
self.assert_link_states(test_veth='managed', test_peer='unmanaged')
|
self.assert_link_states(test_veth='managed', test_peer='unmanaged')
|
||||||
|
|
||||||
|
|
|
@ -19,15 +19,16 @@
|
||||||
# You should have received a copy of the GNU Lesser General Public License
|
# You should have received a copy of the GNU Lesser General Public License
|
||||||
# along with systemd; If not, see <http://www.gnu.org/licenses/>.
|
# along with systemd; If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import unittest
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
import subprocess
|
|
||||||
import tempfile
|
|
||||||
import shutil
|
|
||||||
from glob import glob
|
|
||||||
import collections
|
import collections
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
|
||||||
from configparser import RawConfigParser
|
from configparser import RawConfigParser
|
||||||
|
from glob import glob
|
||||||
|
|
||||||
sysv_generator = './systemd-sysv-generator'
|
sysv_generator = './systemd-sysv-generator'
|
||||||
|
|
||||||
|
@ -112,22 +113,20 @@ class SysvGeneratorTest(unittest.TestCase):
|
||||||
keys.setdefault('Required-Stop', keys['Required-Start'])
|
keys.setdefault('Required-Stop', keys['Required-Start'])
|
||||||
keys.setdefault('Default-Start', '2 3 4 5')
|
keys.setdefault('Default-Start', '2 3 4 5')
|
||||||
keys.setdefault('Default-Stop', '0 1 6')
|
keys.setdefault('Default-Stop', '0 1 6')
|
||||||
keys.setdefault('Short-Description', 'test %s service' %
|
keys.setdefault('Short-Description', 'test {} service'.format(name_without_sh))
|
||||||
name_without_sh)
|
keys.setdefault('Description', 'long description for test {} service'.format(name_without_sh))
|
||||||
keys.setdefault('Description', 'long description for test %s service' %
|
|
||||||
name_without_sh)
|
|
||||||
script = os.path.join(self.init_d_dir, fname)
|
script = os.path.join(self.init_d_dir, fname)
|
||||||
with open(script, 'w') as f:
|
with open(script, 'w') as f:
|
||||||
f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
|
f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
|
||||||
for k, v in keys.items():
|
for k, v in keys.items():
|
||||||
if v is not None:
|
if v is not None:
|
||||||
f.write('#%20s %s\n' % (k + ':', v))
|
f.write('#{:>20} {}\n'.format(k + ':', v))
|
||||||
f.write('### END INIT INFO\ncode --goes here\n')
|
f.write('### END INIT INFO\ncode --goes here\n')
|
||||||
os.chmod(script, 0o755)
|
os.chmod(script, 0o755)
|
||||||
|
|
||||||
if enable:
|
if enable:
|
||||||
def make_link(prefix, runlevel):
|
def make_link(prefix, runlevel):
|
||||||
d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
|
d = os.path.join(self.rcnd_dir, 'rc{}.d'.format(runlevel))
|
||||||
if not os.path.isdir(d):
|
if not os.path.isdir(d):
|
||||||
os.mkdir(d)
|
os.mkdir(d)
|
||||||
os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
|
os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
|
||||||
|
@ -146,7 +145,7 @@ class SysvGeneratorTest(unittest.TestCase):
|
||||||
|
|
||||||
# should be enabled
|
# should be enabled
|
||||||
for target in all_targets:
|
for target in all_targets:
|
||||||
link = os.path.join(self.out_dir, '%s.target.wants' % target, unit)
|
link = os.path.join(self.out_dir, '{}.target.wants'.format(target), unit)
|
||||||
if target in targets:
|
if target in targets:
|
||||||
unit_file = os.readlink(link)
|
unit_file = os.readlink(link)
|
||||||
# os.path.exists() will fail on a dangling symlink
|
# os.path.exists() will fail on a dangling symlink
|
||||||
|
@ -154,7 +153,7 @@ class SysvGeneratorTest(unittest.TestCase):
|
||||||
self.assertEqual(os.path.basename(unit_file), unit)
|
self.assertEqual(os.path.basename(unit_file), unit)
|
||||||
else:
|
else:
|
||||||
self.assertFalse(os.path.exists(link),
|
self.assertFalse(os.path.exists(link),
|
||||||
'%s unexpectedly exists' % link)
|
'{} unexpectedly exists'.format(link))
|
||||||
|
|
||||||
#
|
#
|
||||||
# test cases
|
# test cases
|
||||||
|
@ -188,9 +187,9 @@ class SysvGeneratorTest(unittest.TestCase):
|
||||||
self.assertEqual(s.get('Service', 'Type'), 'forking')
|
self.assertEqual(s.get('Service', 'Type'), 'forking')
|
||||||
init_script = os.path.join(self.init_d_dir, 'foo')
|
init_script = os.path.join(self.init_d_dir, 'foo')
|
||||||
self.assertEqual(s.get('Service', 'ExecStart'),
|
self.assertEqual(s.get('Service', 'ExecStart'),
|
||||||
'%s start' % init_script)
|
'{} start'.format(init_script))
|
||||||
self.assertEqual(s.get('Service', 'ExecStop'),
|
self.assertEqual(s.get('Service', 'ExecStop'),
|
||||||
'%s stop' % init_script)
|
'{} stop'.format(init_script))
|
||||||
|
|
||||||
self.assertNotIn('Overwriting', err)
|
self.assertNotIn('Overwriting', err)
|
||||||
|
|
||||||
|
@ -276,7 +275,7 @@ class SysvGeneratorTest(unittest.TestCase):
|
||||||
d = os.path.join(self.rcnd_dir, 'rc2.d')
|
d = os.path.join(self.rcnd_dir, 'rc2.d')
|
||||||
if not os.path.isdir(d):
|
if not os.path.isdir(d):
|
||||||
os.mkdir(d)
|
os.mkdir(d)
|
||||||
os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
|
os.symlink('../init.d/' + name, os.path.join(d, 'S{:>2}{}'.format(prio, name)))
|
||||||
|
|
||||||
err, results = self.run_generator()
|
err, results = self.run_generator()
|
||||||
self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
|
self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
|
||||||
|
@ -351,9 +350,9 @@ class SysvGeneratorTest(unittest.TestCase):
|
||||||
# calls correct script with .sh
|
# calls correct script with .sh
|
||||||
init_script = os.path.join(self.init_d_dir, 'foo.sh')
|
init_script = os.path.join(self.init_d_dir, 'foo.sh')
|
||||||
self.assertEqual(s.get('Service', 'ExecStart'),
|
self.assertEqual(s.get('Service', 'ExecStart'),
|
||||||
'%s start' % init_script)
|
'{} start'.format(init_script))
|
||||||
self.assertEqual(s.get('Service', 'ExecStop'),
|
self.assertEqual(s.get('Service', 'ExecStop'),
|
||||||
'%s stop' % init_script)
|
'{} stop'.format(init_script))
|
||||||
|
|
||||||
self.assert_enabled('foo.service', ['multi-user', 'graphical'])
|
self.assert_enabled('foo.service', ['multi-user', 'graphical'])
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue