123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- import errno
- import os
- import select
- import socket as pysocket
- import struct
-
- try:
- from ..transport import NpipeSocket
- except ImportError:
- NpipeSocket = type(None)
-
-
- STDOUT = 1
- STDERR = 2
-
-
- class SocketError(Exception):
- pass
-
-
- # NpipeSockets have their own error types
- # pywintypes.error: (109, 'ReadFile', 'The pipe has been ended.')
- NPIPE_ENDED = 109
-
-
- def read(socket, n=4096):
- """
- Reads at most n bytes from socket
- """
-
- recoverable_errors = (errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK)
-
- if not isinstance(socket, NpipeSocket):
- if not hasattr(select, "poll"):
- # Limited to 1024
- select.select([socket], [], [])
- else:
- poll = select.poll()
- poll.register(socket, select.POLLIN | select.POLLPRI)
- poll.poll()
-
- try:
- if hasattr(socket, 'recv'):
- return socket.recv(n)
- if isinstance(socket, getattr(pysocket, 'SocketIO')):
- return socket.read(n)
- return os.read(socket.fileno(), n)
- except OSError as e:
- if e.errno not in recoverable_errors:
- raise
- except Exception as e:
- is_pipe_ended = (isinstance(socket, NpipeSocket) and
- len(e.args) > 0 and
- e.args[0] == NPIPE_ENDED)
- if is_pipe_ended:
- # npipes don't support duplex sockets, so we interpret
- # a PIPE_ENDED error as a close operation (0-length read).
- return ''
- raise
-
-
- def read_exactly(socket, n):
- """
- Reads exactly n bytes from socket
- Raises SocketError if there isn't enough data
- """
- data = bytes()
- while len(data) < n:
- next_data = read(socket, n - len(data))
- if not next_data:
- raise SocketError("Unexpected EOF")
- data += next_data
- return data
-
-
- def next_frame_header(socket):
- """
- Returns the stream and size of the next frame of data waiting to be read
- from socket, according to the protocol defined here:
-
- https://docs.docker.com/engine/api/v1.24/#attach-to-a-container
- """
- try:
- data = read_exactly(socket, 8)
- except SocketError:
- return (-1, -1)
-
- stream, actual = struct.unpack('>BxxxL', data)
- return (stream, actual)
-
-
- def frames_iter(socket, tty):
- """
- Return a generator of frames read from socket. A frame is a tuple where
- the first item is the stream number and the second item is a chunk of data.
-
- If the tty setting is enabled, the streams are multiplexed into the stdout
- stream.
- """
- if tty:
- return ((STDOUT, frame) for frame in frames_iter_tty(socket))
- else:
- return frames_iter_no_tty(socket)
-
-
- def frames_iter_no_tty(socket):
- """
- Returns a generator of data read from the socket when the tty setting is
- not enabled.
- """
- while True:
- (stream, n) = next_frame_header(socket)
- if n < 0:
- break
- while n > 0:
- result = read(socket, n)
- if result is None:
- continue
- data_length = len(result)
- if data_length == 0:
- # We have reached EOF
- return
- n -= data_length
- yield (stream, result)
-
-
- def frames_iter_tty(socket):
- """
- Return a generator of data read from the socket when the tty setting is
- enabled.
- """
- while True:
- result = read(socket)
- if len(result) == 0:
- # We have reached EOF
- return
- yield result
-
-
- def consume_socket_output(frames, demux=False):
- """
- Iterate through frames read from the socket and return the result.
-
- Args:
-
- demux (bool):
- If False, stdout and stderr are multiplexed, and the result is the
- concatenation of all the frames. If True, the streams are
- demultiplexed, and the result is a 2-tuple where each item is the
- concatenation of frames belonging to the same stream.
- """
- if demux is False:
- # If the streams are multiplexed, the generator returns strings, that
- # we just need to concatenate.
- return bytes().join(frames)
-
- # If the streams are demultiplexed, the generator yields tuples
- # (stdout, stderr)
- out = [None, None]
- for frame in frames:
- # It is guaranteed that for each frame, one and only one stream
- # is not None.
- assert frame != (None, None)
- if frame[0] is not None:
- if out[0] is None:
- out[0] = frame[0]
- else:
- out[0] += frame[0]
- else:
- if out[1] is None:
- out[1] = frame[1]
- else:
- out[1] += frame[1]
- return tuple(out)
-
-
- def demux_adaptor(stream_id, data):
- """
- Utility to demultiplex stdout and stderr when reading frames from the
- socket.
- """
- if stream_id == STDOUT:
- return (data, None)
- elif stream_id == STDERR:
- return (None, data)
- else:
- raise ValueError(f'{stream_id} is not a valid stream')
|