-
Notifications
You must be signed in to change notification settings - Fork 247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Commanded.aggregate_state does not work when aggregate identity has a prefix #516
Comments
You can just call aggregate_state with the prefixed id, so instead of calling |
yeah exactly, that's also my workaround, but then the docs and typespec of that function need to be changed |
But now i'm also a bit confused about whether or not having a router on a commanded application is required or not, it seems that it's technically not required for dispatching but if you don't do this some features don't work (fetching of state but also |
Ok, I have reached the conclusion that basically Commanded expects that the routing info is present on the application because otherwise process managers can't dispatch commands. It does make me wonder though why the public dispatch api even exists on routers directly (this seems to suggest applications don't have to have the routing info per se). In other words: I think we can assume that the routing information is present on the application and thus I can submit a PR that looks up the prefix from the application composite router (from a compile-time exposed |
The command router module existed before Commanded added the application module to support multiple applications and event stores. It is still possible to dispatch commands directly via a router but you must pass the Commanded application as an option: :ok = BankRouter.dispatch(command, application: BankApp) |
Yeah, that's true, but still, you can't use Process managers. Should I open a PR to document this? Or should we allow process managers to be mounted on routers? I guess they only need the dispatch info? |
To fix the issue we'd need to add an optional argument to the function to specify the aggregate prefix as it is not possible to get the prefix except via a command registered with a router. |
If we can't make the assumption generally that the router is embedded on the application, then indeed it's not possible. But if we have the router then we can expose the identities at compile time just like |
Regarding the process manager, I see #517 was opened. |
Follow me along the journey I took to use TerminologyIf I understand correctly...
And the result of joining the optional prefix is also called the "aggregate's identity". In code the value is again labeled The router identify BankAccount,
by: :account_number,
prefix: "bank-account-" The command is dispatched and the default middleware def before_dispatch(%Pipeline{} = pipeline) do
with aggregate_uuid when aggregate_uuid not in [nil, ""] <- extract_aggregate_uuid(pipeline),
aggregate_uuid when is_binary(aggregate_uuid) <- identity_to_string(aggregate_uuid),
aggregate_uuid when is_binary(aggregate_uuid) <- prefix(aggregate_uuid, pipeline) do
assign(pipeline, :aggregate_uuid, aggregate_uuid) The {:ok, ^aggregate_uuid} =
Commanded.Aggregates.Supervisor.open_aggregate(
application,
aggregate_module,
aggregate_uuid
) And case EventStore.stream_forward(
application,
aggregate_uuid,
aggregate_version + 1,
@read_event_batch_size
) do
Documentation may be unclearThis paragraph under 'Define aggregate identity' in the router docs could produce misunderstanding.
The previous paragraph describes the aggregate identity. In the simple case, the identity is typically an aggregate id field, hence
And change "The prefix is used as the stream identity" to correct and break into two concepts:
So then is the typespec correct because the I think the identity documentation under the Commands guide is more thorough. Maybe the module docs could link to the guide to better understand the why.
Oh, I see the reasoning better now. The prefix labels the group of event streams that belong to a certain aggregate module. The rest of the identity (often an UUID from an id field) reflects the specific instance of the aggregate. So the documentation could be clarified regarding a module vs an instance of an aggregate:
Access aggregate state during command dispatch is direct
Access from elsewhere such as a Process manager is not directIn the simple case the caller joins the prefix string to provide the My solution
defmodule Aggregates.UserThing do
def aggregate_identity(%{user_uuid: user_uuid}) do
"user-thing-#{user_uuid}"
end
end
# Can even configure identity in the router now without using prefix key
identify(UserThing, by: &UserThing.aggregate_identity/1)
defmodule Aggregates.UserThing do
def identity_config do
[
by: :user_uuid,
prefix: "user-thing-"
]
end
end
# And then use it in the router
identify(UserThing, UserThing.identity_config())
# Copy logic from Commanded.Middleware.ExtractAggregateIdentity into own module
defmodule MyApp.CommandedIdentity do
@moduledoc """
Extracts the target aggregate's identity.
Logic from `Commanded.Middleware.ExtractAggregateIdentity`.
"""
defmodule Config do
@enforce_keys [:command, :identity, :identity_prefix]
defstruct [:command, :identity, :identity_prefix]
end
alias MyApp.CommandedIdentity.Config
def aggregate_uuid(message, opts) when is_list(opts) do
opts =
Keyword.new(opts, fn
{:by, value} -> {:identity, value}
{:prefix, value} -> {:identity_prefix, value}
end)
|> Keyword.put(:command, message)
config = struct!(Config, opts)
aggregate_uuid(config)
end
def aggregate_uuid(%Config{} = config) do
[...]
end
end To use it. aggregate_uuid = MyApp.CommandedIdentity.aggregate_uuid(command_or_query, UserThing.identity_config())
state = CommandedApp.aggregate_state(UserThing, aggregate_uuid) I determined I didn't need the complex case complications. I went with option 1 to query a simple aggregate state. |
I noticed that
Commanded.aggregate_state
does not work with aggregates that have an identity prefix defined in the router. The returned value is an empty aggregate, because it thinks it's a non-started aggregate.There are few things here I'd like to discuss:
can this "bug" even be fixed? with only the application and aggregate we don't necessarily have access to the routing info right? so is the "fix" then to change the argument from
aggregate_uuid :: Aggregate.uuid()
tostream_uuid :: String.t()
? (so that the caller has to prefix the aggregate themselves)shouldn't this function raise or return an error when an invalid ID is given instead of an empty aggregate?
The text was updated successfully, but these errors were encountered: