|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505 |
- import base64
- import collections
- import json
- import os
- import os.path
- import shlex
- import string
- from datetime import datetime
- from packaging.version import Version
-
- from .. import errors
- from ..constants import DEFAULT_HTTP_HOST
- from ..constants import DEFAULT_UNIX_SOCKET
- from ..constants import DEFAULT_NPIPE
- from ..constants import BYTE_UNITS
- from ..tls import TLSConfig
-
- from urllib.parse import urlparse, urlunparse
-
-
- URLComponents = collections.namedtuple(
- 'URLComponents',
- 'scheme netloc url params query fragment',
- )
-
-
- def create_ipam_pool(*args, **kwargs):
- raise errors.DeprecatedMethod(
- 'utils.create_ipam_pool has been removed. Please use a '
- 'docker.types.IPAMPool object instead.'
- )
-
-
- def create_ipam_config(*args, **kwargs):
- raise errors.DeprecatedMethod(
- 'utils.create_ipam_config has been removed. Please use a '
- 'docker.types.IPAMConfig object instead.'
- )
-
-
- def decode_json_header(header):
- data = base64.b64decode(header)
- data = data.decode('utf-8')
- return json.loads(data)
-
-
- def compare_version(v1, v2):
- """Compare docker versions
-
- >>> v1 = '1.9'
- >>> v2 = '1.10'
- >>> compare_version(v1, v2)
- 1
- >>> compare_version(v2, v1)
- -1
- >>> compare_version(v2, v2)
- 0
- """
- s1 = Version(v1)
- s2 = Version(v2)
- if s1 == s2:
- return 0
- elif s1 > s2:
- return -1
- else:
- return 1
-
-
- def version_lt(v1, v2):
- return compare_version(v1, v2) > 0
-
-
- def version_gte(v1, v2):
- return not version_lt(v1, v2)
-
-
- def _convert_port_binding(binding):
- result = {'HostIp': '', 'HostPort': ''}
- if isinstance(binding, tuple):
- if len(binding) == 2:
- result['HostPort'] = binding[1]
- result['HostIp'] = binding[0]
- elif isinstance(binding[0], str):
- result['HostIp'] = binding[0]
- else:
- result['HostPort'] = binding[0]
- elif isinstance(binding, dict):
- if 'HostPort' in binding:
- result['HostPort'] = binding['HostPort']
- if 'HostIp' in binding:
- result['HostIp'] = binding['HostIp']
- else:
- raise ValueError(binding)
- else:
- result['HostPort'] = binding
-
- if result['HostPort'] is None:
- result['HostPort'] = ''
- else:
- result['HostPort'] = str(result['HostPort'])
-
- return result
-
-
- def convert_port_bindings(port_bindings):
- result = {}
- for k, v in iter(port_bindings.items()):
- key = str(k)
- if '/' not in key:
- key += '/tcp'
- if isinstance(v, list):
- result[key] = [_convert_port_binding(binding) for binding in v]
- else:
- result[key] = [_convert_port_binding(v)]
- return result
-
-
- def convert_volume_binds(binds):
- if isinstance(binds, list):
- return binds
-
- result = []
- for k, v in binds.items():
- if isinstance(k, bytes):
- k = k.decode('utf-8')
-
- if isinstance(v, dict):
- if 'ro' in v and 'mode' in v:
- raise ValueError(
- 'Binding cannot contain both "ro" and "mode": {}'
- .format(repr(v))
- )
-
- bind = v['bind']
- if isinstance(bind, bytes):
- bind = bind.decode('utf-8')
-
- if 'ro' in v:
- mode = 'ro' if v['ro'] else 'rw'
- elif 'mode' in v:
- mode = v['mode']
- else:
- mode = 'rw'
-
- result.append(
- f'{k}:{bind}:{mode}'
- )
- else:
- if isinstance(v, bytes):
- v = v.decode('utf-8')
- result.append(
- f'{k}:{v}:rw'
- )
- return result
-
-
- def convert_tmpfs_mounts(tmpfs):
- if isinstance(tmpfs, dict):
- return tmpfs
-
- if not isinstance(tmpfs, list):
- raise ValueError(
- 'Expected tmpfs value to be either a list or a dict, found: {}'
- .format(type(tmpfs).__name__)
- )
-
- result = {}
- for mount in tmpfs:
- if isinstance(mount, str):
- if ":" in mount:
- name, options = mount.split(":", 1)
- else:
- name = mount
- options = ""
-
- else:
- raise ValueError(
- "Expected item in tmpfs list to be a string, found: {}"
- .format(type(mount).__name__)
- )
-
- result[name] = options
- return result
-
-
- def convert_service_networks(networks):
- if not networks:
- return networks
- if not isinstance(networks, list):
- raise TypeError('networks parameter must be a list.')
-
- result = []
- for n in networks:
- if isinstance(n, str):
- n = {'Target': n}
- result.append(n)
- return result
-
-
- def parse_repository_tag(repo_name):
- parts = repo_name.rsplit('@', 1)
- if len(parts) == 2:
- return tuple(parts)
- parts = repo_name.rsplit(':', 1)
- if len(parts) == 2 and '/' not in parts[1]:
- return tuple(parts)
- return repo_name, None
-
-
- def parse_host(addr, is_win32=False, tls=False):
- # Sensible defaults
- if not addr and is_win32:
- return DEFAULT_NPIPE
- if not addr or addr.strip() == 'unix://':
- return DEFAULT_UNIX_SOCKET
-
- addr = addr.strip()
-
- parsed_url = urlparse(addr)
- proto = parsed_url.scheme
- if not proto or any([x not in string.ascii_letters + '+' for x in proto]):
- # https://bugs.python.org/issue754016
- parsed_url = urlparse('//' + addr, 'tcp')
- proto = 'tcp'
-
- if proto == 'fd':
- raise errors.DockerException('fd protocol is not implemented')
-
- # These protos are valid aliases for our library but not for the
- # official spec
- if proto == 'http' or proto == 'https':
- tls = proto == 'https'
- proto = 'tcp'
- elif proto == 'http+unix':
- proto = 'unix'
-
- if proto not in ('tcp', 'unix', 'npipe', 'ssh'):
- raise errors.DockerException(
- f"Invalid bind address protocol: {addr}"
- )
-
- if proto == 'tcp' and not parsed_url.netloc:
- # "tcp://" is exceptionally disallowed by convention;
- # omitting a hostname for other protocols is fine
- raise errors.DockerException(
- f'Invalid bind address format: {addr}'
- )
-
- if any([
- parsed_url.params, parsed_url.query, parsed_url.fragment,
- parsed_url.password
- ]):
- raise errors.DockerException(
- f'Invalid bind address format: {addr}'
- )
-
- if parsed_url.path and proto == 'ssh':
- raise errors.DockerException(
- 'Invalid bind address format: no path allowed for this protocol:'
- ' {}'.format(addr)
- )
- else:
- path = parsed_url.path
- if proto == 'unix' and parsed_url.hostname is not None:
- # For legacy reasons, we consider unix://path
- # to be valid and equivalent to unix:///path
- path = '/'.join((parsed_url.hostname, path))
-
- netloc = parsed_url.netloc
- if proto in ('tcp', 'ssh'):
- port = parsed_url.port or 0
- if port <= 0:
- if proto != 'ssh':
- raise errors.DockerException(
- 'Invalid bind address format: port is required:'
- ' {}'.format(addr)
- )
- port = 22
- netloc = f'{parsed_url.netloc}:{port}'
-
- if not parsed_url.hostname:
- netloc = f'{DEFAULT_HTTP_HOST}:{port}'
-
- # Rewrite schemes to fit library internals (requests adapters)
- if proto == 'tcp':
- proto = 'http{}'.format('s' if tls else '')
- elif proto == 'unix':
- proto = 'http+unix'
-
- if proto in ('http+unix', 'npipe'):
- return f"{proto}://{path}".rstrip('/')
-
- return urlunparse(URLComponents(
- scheme=proto,
- netloc=netloc,
- url=path,
- params='',
- query='',
- fragment='',
- )).rstrip('/')
-
-
- def parse_devices(devices):
- device_list = []
- for device in devices:
- if isinstance(device, dict):
- device_list.append(device)
- continue
- if not isinstance(device, str):
- raise errors.DockerException(
- f'Invalid device type {type(device)}'
- )
- device_mapping = device.split(':')
- if device_mapping:
- path_on_host = device_mapping[0]
- if len(device_mapping) > 1:
- path_in_container = device_mapping[1]
- else:
- path_in_container = path_on_host
- if len(device_mapping) > 2:
- permissions = device_mapping[2]
- else:
- permissions = 'rwm'
- device_list.append({
- 'PathOnHost': path_on_host,
- 'PathInContainer': path_in_container,
- 'CgroupPermissions': permissions
- })
- return device_list
-
-
- def kwargs_from_env(ssl_version=None, assert_hostname=None, environment=None):
- if not environment:
- environment = os.environ
- host = environment.get('DOCKER_HOST')
-
- # empty string for cert path is the same as unset.
- cert_path = environment.get('DOCKER_CERT_PATH') or None
-
- # empty string for tls verify counts as "false".
- # Any value or 'unset' counts as true.
- tls_verify = environment.get('DOCKER_TLS_VERIFY')
- if tls_verify == '':
- tls_verify = False
- else:
- tls_verify = tls_verify is not None
- enable_tls = cert_path or tls_verify
-
- params = {}
-
- if host:
- params['base_url'] = host
-
- if not enable_tls:
- return params
-
- if not cert_path:
- cert_path = os.path.join(os.path.expanduser('~'), '.docker')
-
- if not tls_verify and assert_hostname is None:
- # assert_hostname is a subset of TLS verification,
- # so if it's not set already then set it to false.
- assert_hostname = False
-
- params['tls'] = TLSConfig(
- client_cert=(os.path.join(cert_path, 'cert.pem'),
- os.path.join(cert_path, 'key.pem')),
- ca_cert=os.path.join(cert_path, 'ca.pem'),
- verify=tls_verify,
- ssl_version=ssl_version,
- assert_hostname=assert_hostname,
- )
-
- return params
-
-
- def convert_filters(filters):
- result = {}
- for k, v in iter(filters.items()):
- if isinstance(v, bool):
- v = 'true' if v else 'false'
- if not isinstance(v, list):
- v = [v, ]
- result[k] = [
- str(item) if not isinstance(item, str) else item
- for item in v
- ]
- return json.dumps(result)
-
-
- def datetime_to_timestamp(dt):
- """Convert a UTC datetime to a Unix timestamp"""
- delta = dt - datetime.utcfromtimestamp(0)
- return delta.seconds + delta.days * 24 * 3600
-
-
- def parse_bytes(s):
- if isinstance(s, (int, float,)):
- return s
- if len(s) == 0:
- return 0
-
- if s[-2:-1].isalpha() and s[-1].isalpha():
- if s[-1] == "b" or s[-1] == "B":
- s = s[:-1]
- units = BYTE_UNITS
- suffix = s[-1].lower()
-
- # Check if the variable is a string representation of an int
- # without a units part. Assuming that the units are bytes.
- if suffix.isdigit():
- digits_part = s
- suffix = 'b'
- else:
- digits_part = s[:-1]
-
- if suffix in units.keys() or suffix.isdigit():
- try:
- digits = float(digits_part)
- except ValueError:
- raise errors.DockerException(
- 'Failed converting the string value for memory ({}) to'
- ' an integer.'.format(digits_part)
- )
-
- # Reconvert to long for the final result
- s = int(digits * units[suffix])
- else:
- raise errors.DockerException(
- 'The specified value for memory ({}) should specify the'
- ' units. The postfix should be one of the `b` `k` `m` `g`'
- ' characters'.format(s)
- )
-
- return s
-
-
- def normalize_links(links):
- if isinstance(links, dict):
- links = iter(links.items())
-
- return [f'{k}:{v}' if v else k for k, v in sorted(links)]
-
-
- def parse_env_file(env_file):
- """
- Reads a line-separated environment file.
- The format of each line should be "key=value".
- """
- environment = {}
-
- with open(env_file) as f:
- for line in f:
-
- if line[0] == '#':
- continue
-
- line = line.strip()
- if not line:
- continue
-
- parse_line = line.split('=', 1)
- if len(parse_line) == 2:
- k, v = parse_line
- environment[k] = v
- else:
- raise errors.DockerException(
- 'Invalid line in environment file {}:\n{}'.format(
- env_file, line))
-
- return environment
-
-
- def split_command(command):
- return shlex.split(command)
-
-
- def format_environment(environment):
- def format_env(key, value):
- if value is None:
- return key
- if isinstance(value, bytes):
- value = value.decode('utf-8')
-
- return f'{key}={value}'
- return [format_env(*var) for var in iter(environment.items())]
-
-
- def format_extra_hosts(extra_hosts, task=False):
- # Use format dictated by Swarm API if container is part of a task
- if task:
- return [
- f'{v} {k}' for k, v in sorted(iter(extra_hosts.items()))
- ]
-
- return [
- f'{k}:{v}' for k, v in sorted(iter(extra_hosts.items()))
- ]
-
-
- def create_host_config(self, *args, **kwargs):
- raise errors.DeprecatedMethod(
- 'utils.create_host_config has been removed. Please use a '
- 'docker.types.HostConfig object instead.'
- )
|