Skip to content

Conversation

@guilledk
Copy link
Collaborator

No description provided.

Guillermo Rodriguez and others added 30 commits March 27, 2025 20:36
EventFD class now expects the fd to already be init with open_eventfd
RingBuff Sender and Receiver fully manage SharedMemory and EventFD lifecycles, no aditional ctx mngrs needed
Separate ring buf tests into its own test bed
Add parametrization to test and cancellation
Add docstrings
Add simple testing data gen module .samples
A few things that can fundamentally change,

- UDS addresses now always encapsulate the local and remote pid such
  that it denotes each side's process much like a TCP *port*.
  |_ `.__init__()` takes a new `maybe_pid: int`.
  |_ this required changes to the `.ipc._uds` backend which will come in
     an subsequent commit!
  |_ `UDSAddress.address_type` becomes a `tuple[str, int]` just like the
      TCP case.
  |_ adjust `wrap_address()` to match.
- use a new `_state.get_rt_dir() -> Path` as the default location for
  UDS socket file: now under `XDG_RUNTIME_DIR'/tractor/` subdir by
  default.
- re-implement `USDAddress.get_random()` to use both the local
  `Actor.uid` (if available) and at least the pid for its socket file
  name.

Removals,
- drop the loop generated `_default_addrs`, simplify to just
  `_default_lo_addrs` for per-transport default registry addresses.
  |_ change to `_address_types: dict[str, Type[Address]]` instead of
     separate types `list`.
  |_ adjust `is_wrapped_addr()` to just check `in _addr_types.values()`.
- comment out `Address.open_stream()` it's unused and i think the wrong
  place for this API.

Renames,
- from `AddressTypes` -> `UnwrappedAddress`, since it's a simple type
  union and all this type set is, is the simple python data-structures
  we encode to for the wire.
  |_ see note about possibly implementing the `.[un]wrap()` stuff as
     `msgspec` codec `enc/dec_hook()`s instead!

Additions,
- add a `mk_uuid()` to be used throughout the runtime including for
  generating the `Aid.uuid` part.
- tons of notes around follow up refinements!
Such that any UDS socket pair is represented (and with the recent
updates to) a `USDAddress` via a similar pair-`tuple[str, int]` as TCP
sockets, a pair of the `.filepath: Path` & the peer proc's `.pid: int`
which we read from the underlying `socket.socket` using
`.set/getsockopt()` calls

Impl deats,
- using the Linux specific APIs, we add a `get_peer_info()` which reads
  the `(pid, uid, gid)` using the `SOL_SOCKET` and `SOL_PEECRED` opts to
  `sock.getsockopt()`.
  |_ this presumes the client has been correspondingly configured to
     deliver the creds via a `sock.setsockopt(SOL_SOCKET, SO_PASSCRED,
     1)` call - this required us to override `trio.open_unix_socket()`.
- override `trio.open_unix_socket()` as per the above bullet to ensure
  connecting peers always transmit "credentials" options info to the
  listener.
- update `.get_stream_addrs()` to always call `get_peer_info()` and
  extract the peer's pid for the `raddr` and use `os.getpid()` for
  `laddr` (obvi).
  |_ as part of the new impl also `log.info()` the creds-info deats and
    socket-file path.
  |_ handle the oddity where it depends which of `.getpeername()` or
    `.getsockname()` will return the file-path; i think it's to do with
    who is client vs. server?

Related refinements,
- set `.layer_key: int = 4` for the "transport layer" ;)
- tweak some typing and multi-line unpacking in `.ipc/_tcp`.
For those mods where it's just a type-alias (name) import change.
Previously whenever an `ActorNursery.start_actor()` call did not receive
a `bind_addrs` arg we would allocate the default `(localhost, 0)` pairs
in the parent, for UDS this obviously won't work nor is it ideal bc it's
nicer to have the actor to be a socket server (who calls
`Address.open_listener()`) define the socket-file-name containing their
unique ID info such as pid, actor-uuid etc.

As such this moves "random" generation of server addresses to the
child-side of a subactor's spawn-sequence when it's sin-`bind_addrs`;
i.e. we do the allocation of the `Address.get_random()` addrs inside
`._runtime.async_main()` instead of `Portal.start_actor()` and **only
when** `accept_addrs`/`bind_addrs` was **not provided by the spawning
parent**.

Further this patch get's way more rigorous about the `SpawnSpec`
processing in the child inside `Actor._from_parent()` such that we
handle any invalid msgs **very loudly and pedantically!**

Impl deats,
- do the "random addr generation" in an explicit `for` loop (instead of
  prior comprehension) to allow for more detailed typing of the layered
  calls to the new `._addr` mod.
- use a `match:/case:` for process any invalid `SpawnSpec` payload case
  where we can instead receive a `MsgTypeError` from the `chan.recv()`
  call in `Actor._from_parent()` to raise it immediately instead of
  triggering downstream type-errors XD
  |_ as per the big `#TODO` we prolly want to take from other callers
     of `Channel.recv()` (like in the `._rpc.process_messages()` loop).
  |_ always raise `InternalError` on non-match/fall-through case!
  |_ add a note about not being able to use `breakpoint()` in this
     section due to causality of `SpawnSpec._runtime_vars` not having
     been processed yet..
  |_ always return a third element from `._from_rent()` eventually to be
     the `preferred_transports: list[str]` from the spawning rent.
- use new `._addr.mk_uuid()` and pass to new `Actor.__init__(uuid: str)`
  for all actor creation (including in all the mods tweaked here).
- Move to new type-alias-name `UnwrappedAddress` throughout.
Much like how `Context` has been implemented, try to give tons of high
level details on all the lower level encapsulated primitives, namely the
`.msgstream/.transport` and any useful runtime state.

B)

Impl deats,
- adjust `.from_addr()` to only call `._addr.wrap_address()` when we
  detect `addr` is unwrapped.
- add another `log.runtime()` using the new `.__repr__()` in
  `Channel.from_addr()`.
- change to `UnwrappedAddress` as in prior commits.
Such that whenev the `self._ctx.chan._exc is trans_err` we suppress.
I.e. when the `Channel._exc: Exception|None` error **is the same as**
set by the `._rpc.process_messages()` loop (that is, set to the
underlying transport layer error), we suppress the lowlevel tb,
otherwise we deliver the full tb since likely something at the lowlevel
that we aren't detecting changed/signalled/is-relevant!
Much like we already do in the `._iter_packets()` async-generator which
delivers to `.recv()` and `async for`, handle the `''[Errno 32] Broken
pipe'` case that can show up with unix-domain-socket usage.

Seems like the cause is due to how fast the socket can be torn down
during a registry addr channel ping where,
- the sending side can break the connection faster then the pong side
  can prep its handshake msg,
- the pong side tries to send it's handshake pkt via
  `.SocketStream.send_all()` after the breakage and then raises
  `trio.BrokenResourceError`.
Namely reducing the duplication of class-fields and `TypeVar`s used
for parametrizing the `Address` protocol type,
- drop all of the `TypeVar` types and just stick with all concrete addrs
  types inheriting from `Address` only.
- rename `Address.name_key` -> `.proto_key`.
- rename `Address.address_type` -> `.unwrapped_type`
- rename `.namespace` -> `.bindspace` to better reflect that this "part"
  of the address represents the possible "space for binding endpoints".
 |_ also linux already uses "namespace" to mean the `netns` and i'd
   prefer to stick with their semantics for that.
- add `TCPAddress/UDSAddress.def_bindspace` values.
- drop commented `.open_stream()` method; never used.
- simplify `UnwrappedAdress` to just a `tuple` of union types.
- add logging to `USDAddress.open_listener()` for now.
- adjust `tractor.ipc/_uds/tcp` transport to use new addr field names.
And map `.__repr__/__str__` to it. Also adjust to new
`Address.proto_key` and add a #TODO for a `.get_peers()`.
goodboy and others added 20 commits April 6, 2025 22:06
That is moving from `._addr`,
- `TCPAddress` to `.ipc._tcp`
- `UDSAddress` to `.ipc._uds`

Obviously this requires adjusting a buncha stuff in `._addr` to avoid
import cycles (the original reason the module was not also included in
the new `.ipc` subpkg) including,

- avoiding "unnecessary" imports of `[Unwrapped]Address` in various modules.
  * since `Address` is a protocol and the main point is that it **does
    not need to be inherited** per
    (https://typing.python.org/en/latest/spec/protocol.html#terminology)
    thus I removed the need for it in both transport submods.
  * and `UnwrappedAddress` is a type alias for tuples.. so we don't
    really always need to be importing it since it also kinda obfuscates
    what the underlying pairs are.
- not exporting everything in submods at the `.ipc` top level and
  importing from specific submods by default.
- only importing various types under a `if typing.TYPE_CHECKING:` guard
  as needed.
Primarily moving the `Actor._serve_forever()`-task-as-method and
supporting actor-instance attributes to a new `.ipo._server` sub-mod
which now encapsulates,
- the coupling various `trio.Nursery`s (and their independent lifetime mgmt)
  to different `trio.serve_listener()`s tasks and `SocketStream`
  handler scopes.
- `Address` and `SocketListener` mgmt and tracking through the idea of
  an "IPC endpoint": each "bound-and-active instance" of a served-listener
  for some (varied transport protocol's socket) address.
- start and shutdown of the entire server's lifetime via an `@acm`.
- delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s
  to the corresponding `.ipc._<proto_key>` sub-module (newly defined
  mod-top-level instead of `Address` method) `start/close_listener()`
  funcs.

Impl details of the `.ipc._server` sub-sys,
- add new `IPCServer`, allocated with `open_ipc_server()`, and which
  encapsulates starting multiple-transport-proto-`trio.abc.Listener`s
  from an input set of `._addr.Address`s using,
  |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new
    `_serve_ipc_eps()`, a rework of what was (effectively)
    `Actor._serve_forever()` and which now,
    * allocates a new `IPCEndpoint`-struct (see below) for each
      address-listener pair alongside the specified
      listener-serving/stream-handling `trio.Nursery`s provided by the
      caller.
    * starts and stops each transport (socket's) listener by calling
      `IPCEndpoint.start/close_listener()` which in turn delegates to
      the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt
      module's equivalent impl.
    * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]`
      which is further exposed through public properties for
      introspection of served transport-protocols and their addresses.
  |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either
     allocated (in which case, as the same instance) or provided by the
     caller of `open_ipc_server()` such that the same nursery-cancel-scope
     controls offered by `trio.serve_listeners(handler_nursery=)` are
     offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()`
     tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`.
- a new `IPCEndpoint`-struct (as mentioned) which wraps each
  transport-proto's address + listener + allocated-supervising-nursery
  to encapsulate the "lifetime of a server IPC endpoint" such that
  eventually we can track and managed per-protocol/address/`.listen_on()`-call
  scoped starts/stops/restarts for the purposes of filtering/banning
  peer traffic.
  |_ also included is an unused `.peer_tpts` table which we can
    hopefully use to replace `Actor._peers` in a `Channel`-tracking
    transport-proto-aware way!

Surrounding changes to `.ipc.*` primitives to match,
- make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus
  drop any-and-all `addr._host =` style mutation throughout.
  |_ as such also drop their `.__init__()` and `.__eq__()` meths.
  |_ UDS tweaks to field names and thus `.__repr__()`.
- move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level
  equiv `start|close_listener()` funcs.
- just hard code the `.ipc._types._key_to_transport/._addr_to_transport`
  table entries instead of all the prior fancy dynamic class property
  reading stuff (remember, "explicit is better then implicit").

Modified in `._runtime.Actor` internals,
- drop the `._serve_forever()` and `.cancel_server()`, methods and
  `._server_down` waiting logic from `.cancel_soon()`
- add `.[_]ipc_server` which is opened just after the `._service_n` and
  delegate to it for any equivalent publicly exposed instance
  attributes/properties.
Such that re-wrapping/raising from a low-level `trio` resource error is
simpler and includes the `.src_exc` in the `__repr__()` and
`.message/.args` rendered at higher layers (like from `Channel` and
`._rpc` machinery).

Impl deats,
- mainly leverages packing in a new cls-method `.repr_src_exc() -> str:`
  repr of the underlying error before an optional `body: str` all as
  handled by the previously augmented `.pformat()`'s delegation to
  `pformat_exc()`.
- change `.src_exc` to be a property around a renamed `._src_exc`.

But wait, why?
- use it inside `MsgpackTransport.send()` to rewrap any
  `trio.BrokenResourceError`s so we always see the underlying
  `trio`-src-exc just like in the `.recv()._iter_packets()` handlers.
Just like we *were* for the `trio`-resource-errors it normally wraps
since we now also do the same wrapping in `MsgpackTransport.send()`
and we don't normally care to raise tpt-closure-errors on graceful actor
cancel requests.

Also, warn-report any non-tpt-closed low-level `trio` errors we haven't
yet re-wrapped (likely bc they haven't shown up).
For now just wrapping wtv the `._def_tpt_proto` per-actor setting is.
…one`

Also ensure we assertion-error whenever the list is > 1 entry for now!
Such that the global test-session always (and only) runs against the CLI
specified `--tpt-proto=` transport protocol.
Namely while what I was actually trying to solve was why
`TransportClosed` was getting raised from `Portal.cancel_actor()` but
still useful edge case auditing either way. Also opts into the
`debug_mode` fixture with apprope timeout adjustment B)
Call it `handle_stream_from_peer()` and bind in the `actor: Actor` via
a `handler=partial()` to `trio.serve_listeners()`.

With this (minus the `Actor._peers/._peer_connected/._no_more_peers`
attrs ofc) we get nearly full separation of IPC-connection-processing
(concerns) from `Actor` state. Thus it's a first look at modularizing
the low-level runtime into isolated subsystems which will hopefully
improve the entire code base's grok-ability and ease any new feature
design discussions especially pertaining to introducing and/or
composing-together any new transport protocols.
Namely transferring the `Actor` peer-`Channel` tracking attrs,
- `._peers` which maps the uids to client channels (with duplicates
  apparently..)
- the `._peer_connected: dict[tuple[str, str], trio.Event]` child-peer
  syncing table mostly used by parent actors to wait on sub's to connect
  back during spawn.
- the `._no_more_peers = trio.Event()` level triggered state signal.

Further we move over with some minor reworks,
- `.wait_for_peer()` verbatim (adjusting all dependants).
- factor the no-more-peers shielded wait branch-block out of
  the end of `async_main()` into 2 new server meths,
  * `.has_peers()` with optional chan-connected checking flag.
  * `.wait_for_no_more_peers()` which *just* does the
    maybe-shielded `._no_more_peers.wait()`
box_types: dict[Type, Struct] = {}
for ext_type in ext_types:
info = msgspec.inspect.type_info(ext_type)
if isinstance(info, CustomType):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs on all this in at least the doc-string would be great ;)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also obvi linking the msgpec docs where possible as well 🙏🏼

# yield ext_codec
#
# ^-TODO-^ is it impossible to make something like this orr!?
# builtins we can have in same pld_spec as custom types
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

says who? link to msgspec docs pretty please :)

goodboy added a commit that referenced this pull request Apr 26, 2025
Shared memory array API and optional tight integration with `numpy`

Landing this so many downstream major feature branches depend on it namely,
- #375
- #376
- and the eventual https://pikers.dev/goodboy/tractor/pulls/10
@goodboy goodboy changed the title CI uv update + enc_hook & dec_hook factory enc_hook & dec_hook factory (+ CI uv update) Jun 21, 2025
@goodboy goodboy force-pushed the structural_dynamics_of_flow branch from fbc9325 to 27e6ad1 Compare July 11, 2025 02:05
Base automatically changed from structural_dynamics_of_flow to main July 13, 2025 19:11
@goodboy
Copy link
Owner

goodboy commented Jul 13, 2025

@guilledk trying to figure out what we can actually distill from this without bringing in too much more then delegating to msgspec.

Also this needs a rebase!

@goodboy
Copy link
Owner

goodboy commented Aug 20, 2025

@guilledk needs a big rebase, but also this ended up getting moved elsewhere yah?

@goodboy
Copy link
Owner

goodboy commented Sep 25, 2025

Gotta rebase this one obvi but i think despite most of this stuff in a more refined form (i think) getting moved elsewhere, there's still a couple things we might want to keep. So i will be going through it, maybe updating and landing some content.

@goodboy goodboy marked this pull request as draft September 25, 2025 20:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants