123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707 |
- from __future__ import annotations
-
- import enum
- import logging
- import uuid
- from typing import Generator, List, Optional, Type, Union
-
- from .exceptions import (
- ConnectionClosed,
- ConnectionClosedError,
- ConnectionClosedOK,
- InvalidState,
- PayloadTooBig,
- ProtocolError,
- )
- from .extensions import Extension
- from .frames import (
- OK_CLOSE_CODES,
- OP_BINARY,
- OP_CLOSE,
- OP_CONT,
- OP_PING,
- OP_PONG,
- OP_TEXT,
- Close,
- Frame,
- )
- from .http11 import Request, Response
- from .streams import StreamReader
- from .typing import LoggerLike, Origin, Subprotocol
-
-
- __all__ = [
- "Protocol",
- "Side",
- "State",
- "SEND_EOF",
- ]
-
- Event = Union[Request, Response, Frame]
- """Events that :meth:`~Protocol.events_received` may return."""
-
-
- class Side(enum.IntEnum):
- """A WebSocket connection is either a server or a client."""
-
- SERVER, CLIENT = range(2)
-
-
- SERVER = Side.SERVER
- CLIENT = Side.CLIENT
-
-
- class State(enum.IntEnum):
- """A WebSocket connection is in one of these four states."""
-
- CONNECTING, OPEN, CLOSING, CLOSED = range(4)
-
-
- CONNECTING = State.CONNECTING
- OPEN = State.OPEN
- CLOSING = State.CLOSING
- CLOSED = State.CLOSED
-
-
- SEND_EOF = b""
- """Sentinel signaling that the TCP connection must be half-closed."""
-
-
- class Protocol:
- """
- Sans-I/O implementation of a WebSocket connection.
-
- Args:
- side: :attr:`~Side.CLIENT` or :attr:`~Side.SERVER`.
- state: initial state of the WebSocket connection.
- max_size: maximum size of incoming messages in bytes;
- :obj:`None` disables the limit.
- logger: logger for this connection; depending on ``side``,
- defaults to ``logging.getLogger("websockets.client")``
- or ``logging.getLogger("websockets.server")``;
- see the :doc:`logging guide <../../topics/logging>` for details.
-
- """
-
- def __init__(
- self,
- side: Side,
- *,
- state: State = OPEN,
- max_size: Optional[int] = 2**20,
- logger: Optional[LoggerLike] = None,
- ) -> None:
- # Unique identifier. For logs.
- self.id: uuid.UUID = uuid.uuid4()
- """Unique identifier of the connection. Useful in logs."""
-
- # Logger or LoggerAdapter for this connection.
- if logger is None:
- logger = logging.getLogger(f"websockets.{side.name.lower()}")
- self.logger: LoggerLike = logger
- """Logger for this connection."""
-
- # Track if DEBUG is enabled. Shortcut logging calls if it isn't.
- self.debug = logger.isEnabledFor(logging.DEBUG)
-
- # Connection side. CLIENT or SERVER.
- self.side = side
-
- # Connection state. Initially OPEN because subclasses handle CONNECTING.
- self.state = state
-
- # Maximum size of incoming messages in bytes.
- self.max_size = max_size
-
- # Current size of incoming message in bytes. Only set while reading a
- # fragmented message i.e. a data frames with the FIN bit not set.
- self.cur_size: Optional[int] = None
-
- # True while sending a fragmented message i.e. a data frames with the
- # FIN bit not set.
- self.expect_continuation_frame = False
-
- # WebSocket protocol parameters.
- self.origin: Optional[Origin] = None
- self.extensions: List[Extension] = []
- self.subprotocol: Optional[Subprotocol] = None
-
- # Close code and reason, set when a close frame is sent or received.
- self.close_rcvd: Optional[Close] = None
- self.close_sent: Optional[Close] = None
- self.close_rcvd_then_sent: Optional[bool] = None
-
- # Track if an exception happened during the handshake.
- self.handshake_exc: Optional[Exception] = None
- """
- Exception to raise if the opening handshake failed.
-
- :obj:`None` if the opening handshake succeeded.
-
- """
-
- # Track if send_eof() was called.
- self.eof_sent = False
-
- # Parser state.
- self.reader = StreamReader()
- self.events: List[Event] = []
- self.writes: List[bytes] = []
- self.parser = self.parse()
- next(self.parser) # start coroutine
- self.parser_exc: Optional[Exception] = None
-
- @property
- def state(self) -> State:
- """
- WebSocket connection state.
-
- Defined in 4.1, 4.2, 7.1.3, and 7.1.4 of :rfc:`6455`.
-
- """
- return self._state
-
- @state.setter
- def state(self, state: State) -> None:
- if self.debug:
- self.logger.debug("= connection is %s", state.name)
- self._state = state
-
- @property
- def close_code(self) -> Optional[int]:
- """
- `WebSocket close code`_.
-
- .. _WebSocket close code:
- https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5
-
- :obj:`None` if the connection isn't closed yet.
-
- """
- if self.state is not CLOSED:
- return None
- elif self.close_rcvd is None:
- return 1006
- else:
- return self.close_rcvd.code
-
- @property
- def close_reason(self) -> Optional[str]:
- """
- `WebSocket close reason`_.
-
- .. _WebSocket close reason:
- https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.6
-
- :obj:`None` if the connection isn't closed yet.
-
- """
- if self.state is not CLOSED:
- return None
- elif self.close_rcvd is None:
- return ""
- else:
- return self.close_rcvd.reason
-
- @property
- def close_exc(self) -> ConnectionClosed:
- """
- Exception to raise when trying to interact with a closed connection.
-
- Don't raise this exception while the connection :attr:`state`
- is :attr:`~websockets.protocol.State.CLOSING`; wait until
- it's :attr:`~websockets.protocol.State.CLOSED`.
-
- Indeed, the exception includes the close code and reason, which are
- known only once the connection is closed.
-
- Raises:
- AssertionError: if the connection isn't closed yet.
-
- """
- assert self.state is CLOSED, "connection isn't closed yet"
- exc_type: Type[ConnectionClosed]
- if (
- self.close_rcvd is not None
- and self.close_sent is not None
- and self.close_rcvd.code in OK_CLOSE_CODES
- and self.close_sent.code in OK_CLOSE_CODES
- ):
- exc_type = ConnectionClosedOK
- else:
- exc_type = ConnectionClosedError
- exc: ConnectionClosed = exc_type(
- self.close_rcvd,
- self.close_sent,
- self.close_rcvd_then_sent,
- )
- # Chain to the exception raised in the parser, if any.
- exc.__cause__ = self.parser_exc
- return exc
-
- # Public methods for receiving data.
-
- def receive_data(self, data: bytes) -> None:
- """
- Receive data from the network.
-
- After calling this method:
-
- - You must call :meth:`data_to_send` and send this data to the network.
- - You should call :meth:`events_received` and process resulting events.
-
- Raises:
- EOFError: if :meth:`receive_eof` was called earlier.
-
- """
- self.reader.feed_data(data)
- next(self.parser)
-
- def receive_eof(self) -> None:
- """
- Receive the end of the data stream from the network.
-
- After calling this method:
-
- - You must call :meth:`data_to_send` and send this data to the network;
- it will return ``[b""]``, signaling the end of the stream, or ``[]``.
- - You aren't expected to call :meth:`events_received`; it won't return
- any new events.
-
- Raises:
- EOFError: if :meth:`receive_eof` was called earlier.
-
- """
- self.reader.feed_eof()
- next(self.parser)
-
- # Public methods for sending events.
-
- def send_continuation(self, data: bytes, fin: bool) -> None:
- """
- Send a `Continuation frame`_.
-
- .. _Continuation frame:
- https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
-
- Parameters:
- data: payload containing the same kind of data
- as the initial frame.
- fin: FIN bit; set it to :obj:`True` if this is the last frame
- of a fragmented message and to :obj:`False` otherwise.
-
- Raises:
- ProtocolError: if a fragmented message isn't in progress.
-
- """
- if not self.expect_continuation_frame:
- raise ProtocolError("unexpected continuation frame")
- self.expect_continuation_frame = not fin
- self.send_frame(Frame(OP_CONT, data, fin))
-
- def send_text(self, data: bytes, fin: bool = True) -> None:
- """
- Send a `Text frame`_.
-
- .. _Text frame:
- https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
-
- Parameters:
- data: payload containing text encoded with UTF-8.
- fin: FIN bit; set it to :obj:`False` if this is the first frame of
- a fragmented message.
-
- Raises:
- ProtocolError: if a fragmented message is in progress.
-
- """
- if self.expect_continuation_frame:
- raise ProtocolError("expected a continuation frame")
- self.expect_continuation_frame = not fin
- self.send_frame(Frame(OP_TEXT, data, fin))
-
- def send_binary(self, data: bytes, fin: bool = True) -> None:
- """
- Send a `Binary frame`_.
-
- .. _Binary frame:
- https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
-
- Parameters:
- data: payload containing arbitrary binary data.
- fin: FIN bit; set it to :obj:`False` if this is the first frame of
- a fragmented message.
-
- Raises:
- ProtocolError: if a fragmented message is in progress.
-
- """
- if self.expect_continuation_frame:
- raise ProtocolError("expected a continuation frame")
- self.expect_continuation_frame = not fin
- self.send_frame(Frame(OP_BINARY, data, fin))
-
- def send_close(self, code: Optional[int] = None, reason: str = "") -> None:
- """
- Send a `Close frame`_.
-
- .. _Close frame:
- https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1
-
- Parameters:
- code: close code.
- reason: close reason.
-
- Raises:
- ProtocolError: if a fragmented message is being sent, if the code
- isn't valid, or if a reason is provided without a code
-
- """
- if self.expect_continuation_frame:
- raise ProtocolError("expected a continuation frame")
- if code is None:
- if reason != "":
- raise ProtocolError("cannot send a reason without a code")
- close = Close(1005, "")
- data = b""
- else:
- close = Close(code, reason)
- data = close.serialize()
- # send_frame() guarantees that self.state is OPEN at this point.
- # 7.1.3. The WebSocket Closing Handshake is Started
- self.send_frame(Frame(OP_CLOSE, data))
- self.close_sent = close
- self.state = CLOSING
-
- def send_ping(self, data: bytes) -> None:
- """
- Send a `Ping frame`_.
-
- .. _Ping frame:
- https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.2
-
- Parameters:
- data: payload containing arbitrary binary data.
-
- """
- self.send_frame(Frame(OP_PING, data))
-
- def send_pong(self, data: bytes) -> None:
- """
- Send a `Pong frame`_.
-
- .. _Pong frame:
- https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.3
-
- Parameters:
- data: payload containing arbitrary binary data.
-
- """
- self.send_frame(Frame(OP_PONG, data))
-
- def fail(self, code: int, reason: str = "") -> None:
- """
- `Fail the WebSocket connection`_.
-
- .. _Fail the WebSocket connection:
- https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.7
-
- Parameters:
- code: close code
- reason: close reason
-
- Raises:
- ProtocolError: if the code isn't valid.
- """
- # 7.1.7. Fail the WebSocket Connection
-
- # Send a close frame when the state is OPEN (a close frame was already
- # sent if it's CLOSING), except when failing the connection because
- # of an error reading from or writing to the network.
- if self.state is OPEN:
- if code != 1006:
- close = Close(code, reason)
- data = close.serialize()
- self.send_frame(Frame(OP_CLOSE, data))
- self.close_sent = close
- self.state = CLOSING
-
- # When failing the connection, a server closes the TCP connection
- # without waiting for the client to complete the handshake, while a
- # client waits for the server to close the TCP connection, possibly
- # after sending a close frame that the client will ignore.
- if self.side is SERVER and not self.eof_sent:
- self.send_eof()
-
- # 7.1.7. Fail the WebSocket Connection "An endpoint MUST NOT continue
- # to attempt to process data(including a responding Close frame) from
- # the remote endpoint after being instructed to _Fail the WebSocket
- # Connection_."
- self.parser = self.discard()
- next(self.parser) # start coroutine
-
- # Public method for getting incoming events after receiving data.
-
- def events_received(self) -> List[Event]:
- """
- Fetch events generated from data received from the network.
-
- Call this method immediately after any of the ``receive_*()`` methods.
-
- Process resulting events, likely by passing them to the application.
-
- Returns:
- List[Event]: Events read from the connection.
- """
- events, self.events = self.events, []
- return events
-
- # Public method for getting outgoing data after receiving data or sending events.
-
- def data_to_send(self) -> List[bytes]:
- """
- Obtain data to send to the network.
-
- Call this method immediately after any of the ``receive_*()``,
- ``send_*()``, or :meth:`fail` methods.
-
- Write resulting data to the connection.
-
- The empty bytestring :data:`~websockets.protocol.SEND_EOF` signals
- the end of the data stream. When you receive it, half-close the TCP
- connection.
-
- Returns:
- List[bytes]: Data to write to the connection.
-
- """
- writes, self.writes = self.writes, []
- return writes
-
- def close_expected(self) -> bool:
- """
- Tell if the TCP connection is expected to close soon.
-
- Call this method immediately after any of the ``receive_*()``,
- ``send_close()``, or :meth:`fail` methods.
-
- If it returns :obj:`True`, schedule closing the TCP connection after a
- short timeout if the other side hasn't already closed it.
-
- Returns:
- bool: Whether the TCP connection is expected to close soon.
-
- """
- # We expect a TCP close if and only if we sent a close frame:
- # * Normal closure: once we send a close frame, we expect a TCP close:
- # server waits for client to complete the TCP closing handshake;
- # client waits for server to initiate the TCP closing handshake.
- # * Abnormal closure: we always send a close frame and the same logic
- # applies, except on EOFError where we don't send a close frame
- # because we already received the TCP close, so we don't expect it.
- # We already got a TCP Close if and only if the state is CLOSED.
- return self.state is CLOSING or self.handshake_exc is not None
-
- # Private methods for receiving data.
-
- def parse(self) -> Generator[None, None, None]:
- """
- Parse incoming data into frames.
-
- :meth:`receive_data` and :meth:`receive_eof` run this generator
- coroutine until it needs more data or reaches EOF.
-
- :meth:`parse` never raises an exception. Instead, it sets the
- :attr:`parser_exc` and yields control.
-
- """
- try:
- while True:
- if (yield from self.reader.at_eof()):
- if self.debug:
- self.logger.debug("< EOF")
- # If the WebSocket connection is closed cleanly, with a
- # closing handhshake, recv_frame() substitutes parse()
- # with discard(). This branch is reached only when the
- # connection isn't closed cleanly.
- raise EOFError("unexpected end of stream")
-
- if self.max_size is None:
- max_size = None
- elif self.cur_size is None:
- max_size = self.max_size
- else:
- max_size = self.max_size - self.cur_size
-
- # During a normal closure, execution ends here on the next
- # iteration of the loop after receiving a close frame. At
- # this point, recv_frame() replaced parse() by discard().
- frame = yield from Frame.parse(
- self.reader.read_exact,
- mask=self.side is SERVER,
- max_size=max_size,
- extensions=self.extensions,
- )
-
- if self.debug:
- self.logger.debug("< %s", frame)
-
- self.recv_frame(frame)
-
- except ProtocolError as exc:
- self.fail(1002, str(exc))
- self.parser_exc = exc
-
- except EOFError as exc:
- self.fail(1006, str(exc))
- self.parser_exc = exc
-
- except UnicodeDecodeError as exc:
- self.fail(1007, f"{exc.reason} at position {exc.start}")
- self.parser_exc = exc
-
- except PayloadTooBig as exc:
- self.fail(1009, str(exc))
- self.parser_exc = exc
-
- except Exception as exc:
- self.logger.error("parser failed", exc_info=True)
- # Don't include exception details, which may be security-sensitive.
- self.fail(1011)
- self.parser_exc = exc
-
- # During an abnormal closure, execution ends here after catching an
- # exception. At this point, fail() replaced parse() by discard().
- yield
- raise AssertionError("parse() shouldn't step after error")
-
- def discard(self) -> Generator[None, None, None]:
- """
- Discard incoming data.
-
- This coroutine replaces :meth:`parse`:
-
- - after receiving a close frame, during a normal closure (1.4);
- - after sending a close frame, during an abnormal closure (7.1.7).
-
- """
- # The server close the TCP connection in the same circumstances where
- # discard() replaces parse(). The client closes the connection later,
- # after the server closes the connection or a timeout elapses.
- # (The latter case cannot be handled in this Sans-I/O layer.)
- assert (self.side is SERVER) == (self.eof_sent)
- while not (yield from self.reader.at_eof()):
- self.reader.discard()
- if self.debug:
- self.logger.debug("< EOF")
- # A server closes the TCP connection immediately, while a client
- # waits for the server to close the TCP connection.
- if self.side is CLIENT:
- self.send_eof()
- self.state = CLOSED
- # If discard() completes normally, execution ends here.
- yield
- # Once the reader reaches EOF, its feed_data/eof() methods raise an
- # error, so our receive_data/eof() methods don't step the generator.
- raise AssertionError("discard() shouldn't step after EOF")
-
- def recv_frame(self, frame: Frame) -> None:
- """
- Process an incoming frame.
-
- """
- if frame.opcode is OP_TEXT or frame.opcode is OP_BINARY:
- if self.cur_size is not None:
- raise ProtocolError("expected a continuation frame")
- if frame.fin:
- self.cur_size = None
- else:
- self.cur_size = len(frame.data)
-
- elif frame.opcode is OP_CONT:
- if self.cur_size is None:
- raise ProtocolError("unexpected continuation frame")
- if frame.fin:
- self.cur_size = None
- else:
- self.cur_size += len(frame.data)
-
- elif frame.opcode is OP_PING:
- # 5.5.2. Ping: "Upon receipt of a Ping frame, an endpoint MUST
- # send a Pong frame in response"
- pong_frame = Frame(OP_PONG, frame.data)
- self.send_frame(pong_frame)
-
- elif frame.opcode is OP_PONG:
- # 5.5.3 Pong: "A response to an unsolicited Pong frame is not
- # expected."
- pass
-
- elif frame.opcode is OP_CLOSE:
- # 7.1.5. The WebSocket Connection Close Code
- # 7.1.6. The WebSocket Connection Close Reason
- self.close_rcvd = Close.parse(frame.data)
- if self.state is CLOSING:
- assert self.close_sent is not None
- self.close_rcvd_then_sent = False
-
- if self.cur_size is not None:
- raise ProtocolError("incomplete fragmented message")
-
- # 5.5.1 Close: "If an endpoint receives a Close frame and did
- # not previously send a Close frame, the endpoint MUST send a
- # Close frame in response. (When sending a Close frame in
- # response, the endpoint typically echos the status code it
- # received.)"
-
- if self.state is OPEN:
- # Echo the original data instead of re-serializing it with
- # Close.serialize() because that fails when the close frame
- # is empty and Close.parse() synthesizes a 1005 close code.
- # The rest is identical to send_close().
- self.send_frame(Frame(OP_CLOSE, frame.data))
- self.close_sent = self.close_rcvd
- self.close_rcvd_then_sent = True
- self.state = CLOSING
-
- # 7.1.2. Start the WebSocket Closing Handshake: "Once an
- # endpoint has both sent and received a Close control frame,
- # that endpoint SHOULD _Close the WebSocket Connection_"
-
- # A server closes the TCP connection immediately, while a client
- # waits for the server to close the TCP connection.
- if self.side is SERVER:
- self.send_eof()
-
- # 1.4. Closing Handshake: "after receiving a control frame
- # indicating the connection should be closed, a peer discards
- # any further data received."
- self.parser = self.discard()
- next(self.parser) # start coroutine
-
- else:
- # This can't happen because Frame.parse() validates opcodes.
- raise AssertionError(f"unexpected opcode: {frame.opcode:02x}")
-
- self.events.append(frame)
-
- # Private methods for sending events.
-
- def send_frame(self, frame: Frame) -> None:
- if self.state is not OPEN:
- raise InvalidState(
- f"cannot write to a WebSocket in the {self.state.name} state"
- )
-
- if self.debug:
- self.logger.debug("> %s", frame)
- self.writes.append(
- frame.serialize(mask=self.side is CLIENT, extensions=self.extensions)
- )
-
- def send_eof(self) -> None:
- assert not self.eof_sent
- self.eof_sent = True
- if self.debug:
- self.logger.debug("> EOF")
- self.writes.append(SEND_EOF)
|