123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- import sys
- from typing import Awaitable, Callable, Dict, Iterable, Optional, Tuple, Type, Union
-
- if sys.version_info >= (3, 8):
- from typing import Literal, Protocol, TypedDict
- else:
- from typing_extensions import Literal, Protocol, TypedDict
-
- __all__ = (
- "ASGIVersions",
- "HTTPScope",
- "WebSocketScope",
- "LifespanScope",
- "WWWScope",
- "Scope",
- "HTTPRequestEvent",
- "HTTPResponseStartEvent",
- "HTTPResponseBodyEvent",
- "HTTPResponseTrailersEvent",
- "HTTPServerPushEvent",
- "HTTPDisconnectEvent",
- "WebSocketConnectEvent",
- "WebSocketAcceptEvent",
- "WebSocketReceiveEvent",
- "WebSocketSendEvent",
- "WebSocketResponseStartEvent",
- "WebSocketResponseBodyEvent",
- "WebSocketDisconnectEvent",
- "WebSocketCloseEvent",
- "LifespanStartupEvent",
- "LifespanShutdownEvent",
- "LifespanStartupCompleteEvent",
- "LifespanStartupFailedEvent",
- "LifespanShutdownCompleteEvent",
- "LifespanShutdownFailedEvent",
- "ASGIReceiveEvent",
- "ASGISendEvent",
- "ASGIReceiveCallable",
- "ASGISendCallable",
- "ASGI2Protocol",
- "ASGI2Application",
- "ASGI3Application",
- "ASGIApplication",
- )
-
-
- class ASGIVersions(TypedDict):
- spec_version: str
- version: Union[Literal["2.0"], Literal["3.0"]]
-
-
- class HTTPScope(TypedDict):
- type: Literal["http"]
- asgi: ASGIVersions
- http_version: str
- method: str
- scheme: str
- path: str
- raw_path: bytes
- query_string: bytes
- root_path: str
- headers: Iterable[Tuple[bytes, bytes]]
- client: Optional[Tuple[str, int]]
- server: Optional[Tuple[str, Optional[int]]]
- extensions: Optional[Dict[str, Dict[object, object]]]
-
-
- class WebSocketScope(TypedDict):
- type: Literal["websocket"]
- asgi: ASGIVersions
- http_version: str
- scheme: str
- path: str
- raw_path: bytes
- query_string: bytes
- root_path: str
- headers: Iterable[Tuple[bytes, bytes]]
- client: Optional[Tuple[str, int]]
- server: Optional[Tuple[str, Optional[int]]]
- subprotocols: Iterable[str]
- extensions: Optional[Dict[str, Dict[object, object]]]
-
-
- class LifespanScope(TypedDict):
- type: Literal["lifespan"]
- asgi: ASGIVersions
-
-
- WWWScope = Union[HTTPScope, WebSocketScope]
- Scope = Union[HTTPScope, WebSocketScope, LifespanScope]
-
-
- class HTTPRequestEvent(TypedDict):
- type: Literal["http.request"]
- body: bytes
- more_body: bool
-
-
- class HTTPResponseStartEvent(TypedDict):
- type: Literal["http.response.start"]
- status: int
- headers: Iterable[Tuple[bytes, bytes]]
- trailers: bool
-
-
- class HTTPResponseBodyEvent(TypedDict):
- type: Literal["http.response.body"]
- body: bytes
- more_body: bool
-
-
- class HTTPResponseTrailersEvent(TypedDict):
- type: Literal["http.response.trailers"]
- headers: Iterable[Tuple[bytes, bytes]]
- more_trailers: bool
-
-
- class HTTPServerPushEvent(TypedDict):
- type: Literal["http.response.push"]
- path: str
- headers: Iterable[Tuple[bytes, bytes]]
-
-
- class HTTPDisconnectEvent(TypedDict):
- type: Literal["http.disconnect"]
-
-
- class WebSocketConnectEvent(TypedDict):
- type: Literal["websocket.connect"]
-
-
- class WebSocketAcceptEvent(TypedDict):
- type: Literal["websocket.accept"]
- subprotocol: Optional[str]
- headers: Iterable[Tuple[bytes, bytes]]
-
-
- class WebSocketReceiveEvent(TypedDict):
- type: Literal["websocket.receive"]
- bytes: Optional[bytes]
- text: Optional[str]
-
-
- class WebSocketSendEvent(TypedDict):
- type: Literal["websocket.send"]
- bytes: Optional[bytes]
- text: Optional[str]
-
-
- class WebSocketResponseStartEvent(TypedDict):
- type: Literal["websocket.http.response.start"]
- status: int
- headers: Iterable[Tuple[bytes, bytes]]
-
-
- class WebSocketResponseBodyEvent(TypedDict):
- type: Literal["websocket.http.response.body"]
- body: bytes
- more_body: bool
-
-
- class WebSocketDisconnectEvent(TypedDict):
- type: Literal["websocket.disconnect"]
- code: int
-
-
- class WebSocketCloseEvent(TypedDict):
- type: Literal["websocket.close"]
- code: int
- reason: Optional[str]
-
-
- class LifespanStartupEvent(TypedDict):
- type: Literal["lifespan.startup"]
-
-
- class LifespanShutdownEvent(TypedDict):
- type: Literal["lifespan.shutdown"]
-
-
- class LifespanStartupCompleteEvent(TypedDict):
- type: Literal["lifespan.startup.complete"]
-
-
- class LifespanStartupFailedEvent(TypedDict):
- type: Literal["lifespan.startup.failed"]
- message: str
-
-
- class LifespanShutdownCompleteEvent(TypedDict):
- type: Literal["lifespan.shutdown.complete"]
-
-
- class LifespanShutdownFailedEvent(TypedDict):
- type: Literal["lifespan.shutdown.failed"]
- message: str
-
-
- ASGIReceiveEvent = Union[
- HTTPRequestEvent,
- HTTPDisconnectEvent,
- WebSocketConnectEvent,
- WebSocketReceiveEvent,
- WebSocketDisconnectEvent,
- LifespanStartupEvent,
- LifespanShutdownEvent,
- ]
-
-
- ASGISendEvent = Union[
- HTTPResponseStartEvent,
- HTTPResponseBodyEvent,
- HTTPResponseTrailersEvent,
- HTTPServerPushEvent,
- HTTPDisconnectEvent,
- WebSocketAcceptEvent,
- WebSocketSendEvent,
- WebSocketResponseStartEvent,
- WebSocketResponseBodyEvent,
- WebSocketCloseEvent,
- LifespanStartupCompleteEvent,
- LifespanStartupFailedEvent,
- LifespanShutdownCompleteEvent,
- LifespanShutdownFailedEvent,
- ]
-
-
- ASGIReceiveCallable = Callable[[], Awaitable[ASGIReceiveEvent]]
- ASGISendCallable = Callable[[ASGISendEvent], Awaitable[None]]
-
-
- class ASGI2Protocol(Protocol):
- def __init__(self, scope: Scope) -> None:
- ...
-
- async def __call__(
- self, receive: ASGIReceiveCallable, send: ASGISendCallable
- ) -> None:
- ...
-
-
- ASGI2Application = Type[ASGI2Protocol]
- ASGI3Application = Callable[
- [
- Scope,
- ASGIReceiveCallable,
- ASGISendCallable,
- ],
- Awaitable[None],
- ]
- ASGIApplication = Union[ASGI2Application, ASGI3Application]
|