Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pg: distributed named process groups #2524

Merged
merged 1 commit into from
Feb 6, 2020
Merged

Conversation

max-au
Copy link
Contributor

@max-au max-au commented Feb 2, 2020

Replacement for pg2 module. Differences (compared to pg2):

  • non-existent and empty group treated the same (empty list of pids),
    thus create/1 and delete/1 have no effect (and not implemented).
    which_groups() return only non-empty groups
  • no cluster lock required, and no dependency on global
  • all join/leave operations require local process (it's not possible to join
    a process from a different node)
  • multi-join: join/leave several processes with a single call

Empty groups are not supported: unlike a process, group does not have
originating node. So it's possible that during net split one node deletes
the group, that still exists for another partition. pg2 will re-create deleted
group as soon as net split converges, which is quite unexpected.

Process groups can be organised into multiple scopes. Scopes are
completely independent of each other. A process may join any
number of groups in any number of scopes. Scopes are designed to
decouple single mesh into a set of overlay networks, reducing
amount of traffic required to propagate group membership
information.

@okeuday
Copy link
Contributor

okeuday commented Feb 2, 2020

@max-au This source code is based on an older version of https://github.com/okeuday/cpg/ . Would be good to give credit to where it got the approach.

@rickard-green rickard-green added the team:VM Assigned to OTP team VM label Feb 3, 2020
@max-au
Copy link
Contributor Author

max-au commented Feb 3, 2020

This code is inspired by Erlang/OTP pg2, distributed process groups. t does not share any ideas, approaches, or code, with any library I am aware of.
Originally it used pg2 exchange protocol for compatibility purposes.

@vans163
Copy link
Contributor

vans163 commented Feb 3, 2020

TBF, its literally just calling monitor on a node + broadcasting a msg + writing to ets. I would understand if this was some kind of intricate invention like a new consensus algorithm to compete with Raft, etc.

@max-au Would this deprecate spg to some degree? Current pg2 indeed really sucks in production, especially if you cannot guarantee 1ms latency between the cluster, and don't even think to use pg2 in a cross datacenter deployment.

@max-au
Copy link
Contributor Author

max-au commented Feb 3, 2020

I will continue maintaining spg for compatibility purposes, and ability to move faster with new features planned.

@okeuday
Copy link
Contributor

okeuday commented Feb 3, 2020

@max-au The https://github.com/okeuday/cpg/ source code was also inspired by pg2 though it has become an important part of CloudI since the 1.1.0 release on 2012-10-30 (http://erlang.org/pipermail/erlang-questions/2012-October/070298.html). The cpg code changed to use a gen_server:call for join/leave in the CloudI 1.3.0 release on 2013-10-20 (http://erlang.org/pipermail/erlang-questions/2013-October/075679.html) as shown at https://github.com/okeuday/cpg/blob/v1.3.0/src/cpg.erl. At that time, cpg had pg2 compatibility source code though CloudI always used create/delete as no-ops with join/leave limited to local pids (no global:trans use).

The reason this is important is the approach is its own type of CRDT that avoids the availability problems of gproc and syn (their focus on leader election and conflict resolution limits their scalability and availability). The cpg approach lacks general acceptance in the Erlang/Elixir community, so the pg source code may not get accepted by Erlang/OTP. Either way, the algorithm is important and seems best to think about as similar to a POLog CRDT without the need of vector clocks because causality is established by the order of join/leave in the Erlang process message queue (an ordered log). More info at https://github.com/okeuday/cpg/#design .

@rickard-green
Copy link
Contributor

@okeuday

I've had a quick look and I have a hard time seeing that it is the same approaches.

I don't see utilization of the signal order guarantees of Erlang as an argument for that either. It is an essential part of the language which is expected to be utilized by most concurrent Erlang software and there are a lot of software doing that which also have been around for a very long time.

@rickard-green rickard-green self-assigned this Feb 4, 2020
@okeuday
Copy link
Contributor

okeuday commented Feb 4, 2020

@rickard-green Yes, all Erlang processes having a message queue that is processed sequentially is an essential part of the language. The algorithm that is common has join and leave operations as deltas or changes in state, to the process group membership, which occurs sequentially locally as a synchronous message, with the change remotely occurring asynchronously. This may seem obvious and just like pg2, but requires the local pid only constraint to ensure consistency is present locally without locking (like pg2's usage of global:trans).

The other direct dependency on Erlang functionality is the node monitoring for handling netsplits to ensure state changes during downtime get exchanged once the node reappears. There are various differences in the approach taken by both cpg and spg. I prefer using cached data in cpg and spg is using ets. The cpg data exchange after a netsplit deals with a history list instead of a list of local pids. However, those differences are variations on the same idea to avoid the global locking present in pg2.

@rickard-green
Copy link
Contributor

@okeuday Sorry, I do not see how you can claim that this is a special approach that is yours. Serializing communication over a local server, and utilizing net_kernels functionality for detecting remote nodes (which pg2 do as well).

It is also unreasonable to assume that someone trying to solve the same problem as you (improving pg2) must have looked at your code just because there are some similarities. The problem is the same, of course there will be similarities.

@michalmuskala
Copy link
Contributor

Going back to the proposed change, I think the name pg might be problematic, given we already have pg2. For somebody new, just coming to the language, and exploring the standard library, it will seem obvious to look at pg2 instead of pg first, since it looks like the newer version.

I see two possible solutions: picking a name without this issue (though I think pg is a good name), or making it obvious pg is newer and better than pg2 in some other way.

@essen
Copy link
Contributor

essen commented Feb 4, 2020

Note that there used to be a pg as well, it was removed in 2014 0e38f3d. This might be worth mentioning as well.

@okeuday
Copy link
Contributor

okeuday commented Feb 4, 2020

@rickard-green The cpg work in CloudI predates spg by many years. If the improvement to pg2 was trivial, I am sure the Erlang/OTP team would have avoided the pg2 global lock sooner, due to the complaints related to pg2. I am not claiming I own the algorithm, only that there is previous work you are casually ignoring.

I didn't accuse anyone of intentionally taking from cpg. I just am skeptical of the situation because I see WhatsApp Inc. in the copyright holder and I know they have been accused of copying features in the past, regarding iMessage in iOS 10. I have no way to influence the situation, I just think it is a poor reflection of the Erlang/Elixir community.

@max-au
Copy link
Contributor Author

max-au commented Feb 4, 2020

@okeuday I looked at old cpg code you suggested (https://github.com/okeuday/cpg/blob/v1.3.0/src/cpg.erl) and found no similarities with spg.
There are several key points implemented in spg, that are not properties of cpg, or any other registry known to me.

  1. spg builds overlay network that connects processes on Erlang nodes. This is the main and most important feature. It defines spg behaviour, and is the key to understanding how spg works. All other distributed process groups known to me operate on nodes level. List of nodes is not stored in the process state, and can change at any time. While list of overlay peers is.
  2. As a consequence of (1), spg process does not monitor nodes (other than for discovery purposes), but monitors a single spg process per remote node.
  3. spg process itself monitors locally joined processes.
  4. spg process uses ETS to avoid single point of contention (gen_server process), but that's what most registries do.

@okeuday
Copy link
Contributor

okeuday commented Feb 5, 2020

@max-au Yes, I never attempted to say that cpg and spg operate in the same way. The most obvious and basic similarity is calling locally registered processes a scope, since the concept of scopes was introduced as a service configuration option for CloudI services, to isolate process group membership (in CloudI v1.2.5, CloudI/CloudI@f4c5fbe ). The cpg approach for monitors more closely matches pg2 but more recent versions of cpg put the monitors into separate node-specific processes to isolate the monitor DOWN messages and group the contents in a single message, to reduce the burden on the scope process.

You may consider the monitors to be forming an overlay network, but I could likely use a similar view to say cpg scope processes make an overlay network. However, asserting that spg or cpg creates an overlay network isn't entirely accurate when considering the distributed Erlang node connections, because those connections are fully-connected by default. The recent releases of cpg do have source code for listening to hidden node connections, which avoids the fully-connected network topology, but without that functionality, the spg overlay network concept is abstract and not related directly to node connections. Unfortunately net_kernel:hidden_connect_node/1 remains undocumented, so hopefully that can get documented as a permanent function sometime in the future.

I understand the spg process use of ETS is more typical compared to other process registries, to avoid using the Erlang process message queue and creating contention. The cpg scope processes can always use cpg_data functions to get the internal cpg scope state and access the cached state directly. That is the difference between the immediate and lazy destination refresh methods in CloudI, with immediate going directly to the scope process and lazy using cached local data that is sometimes stale (as described at http://cloudi.org/api.html#1_Intro_dest ). Using the cached state is quicker than ETS, but can be stale, so it is a design choice in cpg that allows cpg to avoid using global state (keeping the source code referentially transparent).

I was focusing on the important commonality that spg and cpg share which is avoiding the global lock in pg2 by limiting the join and leave operations to local pids in the same way (not having create or delete), which is an important concept and should be in an Erlang/OTP process group solution.

@essen
Copy link
Contributor

essen commented Feb 5, 2020

Is there any benchmarks to demonstrate the scalability improvements of this over other libraries?

@vans163
Copy link
Contributor

vans163 commented Feb 5, 2020

Is there any benchmarks to demonstrate the scalability improvements of this over other libraries?

Theres no need for benchmarks, just look at the underlying implementation. This does not block anywhere, no calls only casts, while pg2 places a global lock on all the nodes in the cluster each operation. The benchmark would be testing what? The time the global lock is held, in this case if there is jitter in latency, like say 1 test is done "in the cloud" and another on baremetal with bare metal (not virtual) switches, they could be vastly different results.

@essen
Copy link
Contributor

essen commented Feb 5, 2020

Is there any benchmarks to demonstrate the scalability improvements of this over other libraries?

Theres no need for benchmarks, just look at the underlying implementation. This does not block anywhere, no calls only casts, while pg2

Hm, virtually nobody is using pg2, though I suppose it could be compared to as a baseline. But then we already have gproc, syn, cpg and others and it would be good to know where this stands to know whether this being included in OTP is a big deal or whether it's just a better pg2.

@rickard-green rickard-green added enhancement testing currently being tested, tag is used by OTP internal CI labels Feb 5, 2020
@vans163
Copy link
Contributor

vans163 commented Feb 5, 2020

Is there any benchmarks to demonstrate the scalability improvements of this over other libraries?

Theres no need for benchmarks, just look at the underlying implementation. This does not block anywhere, no calls only casts, while pg2

Hm, virtually nobody is using pg2, though I suppose it could be compared to as a baseline. But then we already have gproc, syn, cpg and others and it would be good to know where this stands to know whether this being included in OTP is a big deal or whether it's just a better pg2.

It would be nice to have the proper test bed for this. Like maybe some kind of docker compose / similar, that can bring up 1 / 2/ 5 / 10 / 100 / (1000) nodes, then introduce fake latency between them, say <0.01ms (localhost), 1ms (localnet), 4ms (cross region), 50ms (cross state), 100ms (continental), 200ms (intercontinental), 500ms (global), 2000ms (china).

Then place each implementation in the bench and draw some fancy charts.

@puzza007
Copy link
Contributor

puzza007 commented Feb 5, 2020

It would be nice to have the proper test bed for this. Like maybe some kind of docker compose / similar, that can bring up 1 / 2/ 5 / 10 / 100 / (1000) nodes, then introduce fake latency between them, say <0.01ms (localhost), 1ms (localnet), 4ms (cross region), 50ms (cross state), 100ms (continental), 200ms (intercontinental), 500ms (global), 2000ms (china).

Then place each implementation in the bench and draw some fancy charts.

@AeroNotix did some work testing the partition tolerance of various process groups using Jepsen here https://github.com/AeroNotix/jtpg

@AeroNotix
Copy link
Contributor

AeroNotix commented Feb 5, 2020

@puzza007 linked to something I made a few years ago that relies on Jepsen (http://jepsen.io/) which is aimed at discovering failure modes in distributed systems.

It would be best served by being updated to pull in the new Jepsen versions as they have had much improvements over the years.

Well worth doing this imho though!

I ran cpg for a good few years in production at what I would say is a decent load - cpg at the time had many failure modes. Mainly centred around thundering herd type problems, recovery after a netsplit, or even just simply rolling a release into a new cluster (carrying state over). Often getting stuck processing a backlog of messages, with no way to deal with it other than to either wait for cpg to handle the backlog- or to restart the cluster. If @max-au's code hasn't taken code, or even inspiration from cpg - I would say that's a good thing.

I tried all I could to aid cpg along - sharding into more groups - aggregating messages into pools of cpg members by reducing the granularity of which messages get routed to which node member (e.g. sending messages to the whole cluster, through fewer cpg members, rather than to individual pids).

I wish I had access to that system still - to weigh in more concretely on how much of a good thing not being based on cpg is.

@okeuday
Copy link
Contributor

okeuday commented Feb 5, 2020

While all three of us were working with Ubiquiti at the time (@puzza007 @AeroNotix @okeuday), I had no information about the jepsen testing or any real input into it. I only received the trolling related to it, during/after, so I have no way to confirm the validity of the testing. I only see it as consultants mudslinging. The cpg testing related to syn also had problems, which are in the comments on that blog post (not sure where that is). I prefer going with the CloudI load test data which is focused on service request rate, to avoid complexity, but am very interested in legitimate testing.

@AeroNotix
Copy link
Contributor

AeroNotix commented Feb 5, 2020

The jepsen testing was primarily related to pg2, not cpg - though I did give cpg a whirl with it for a lark. There's no mudslinging going on here. It's a technical discussion about the merits and failures of one process group implementation over another.

You do however have ways to confirm the validity of it - the code is up there. The jepsen tests were related to the consistency/partition tolerance and not scalability of process group implementations. My issues with cpg's lack of scalability came many years after I tested with jepsen - implementing a synthetic benchmark for any process group implementation would be relatively trivial. I routinely ran into issues with cpg under load - even when pg2 would handle similar load. So much for pg2 being a poor choice.

Scalability seems to be a bit off-topic for what you're claiming (stolen code is it now?) any way - not sure if a further derailment of the pull request is in order. Perhaps you could supply benchmarks if you are so inclined, I would look forward to seeing them indeed!

EDIT: https://web.archive.org/web/20190126202132/http://www.ostinelli.net/an-evaluation-of-erlang-global-process-registries-meet-syn/ found in the wayback machine where Ostinelli benchmarked various process groups.

EDIT2: the timeouts/monitoring issues remind me of the type of typical issues I would see with cpg. Unavoidable, even when sharding groups? Made me uneasy using it until I just wholesale replaced it with pg2 which never had issues like that.

Replacement for pg2 module. Differences (compared to pg2):
 * non-existent and empty group treated the same (empty list of pids),
   thus create/1 and delete/1 have no effect (and not implemented).
   which_groups() return only non-empty groups
 * no cluster lock required, and no dependency on global
 * all join/leave operations require local process (it's not possible to join
   a process from a different node)
 * multi-join: join/leave several processes with a single call

Empty groups are not supported: unlike a process, group does not have
originating node. So it's possible that during net split one node deletes
the group, that still exists for another partition. pg2 will re-create deleted
group as soon as net split converges, which is quite unexpected.

Process groups can be organised into multiple scopes. Scopes are
completely independent of each other. A process may join any
number of groups in any number of scopes. Scopes are designed to
decouple single mesh into a set of overlay networks, reducing
amount of traffic required to propagate group membership
information.
@rickard-green
Copy link
Contributor

rickard-green commented Feb 6, 2020

@michalmuskala wrote:

Going back to the proposed change, I think the name pg might be problematic, given we already have pg2. For somebody new, just coming to the language, and exploring the standard library, it will seem obvious to look at pg2 instead of pg first, since it looks like the newer version.

I see two possible solutions: picking a name without this issue (though I think pg is a good name), or making it obvious pg is newer and better than pg2 in some other way.

pg2 will be deprecated in OTP 23 and scheduled for removal in OTP 24.

@essen wrote:

Note that there used to be a pg as well, it was removed in 2014 0e38f3d. This might be worth mentioning as well.

I will update documentation to reflect this.

@rickard-green rickard-green removed the testing currently being tested, tag is used by OTP internal CI label Feb 6, 2020
@AeroNotix
Copy link
Contributor

@max-au do you have a blog post covering this implementation or spg?

@essen
Copy link
Contributor

essen commented Feb 6, 2020

Any reasons why this (or pg2) doesn't include support for the via mechanism?

@josevalim
Copy link
Contributor

@essen I think it makes sense for start_link but the overall usage is probably awkward. For example, what happens on gen_server:call({via, pg, some_key}, request)? Should you pick one entry? If so, which strategy do you use? Or should you call all? Similarly, what should pg:whereis_name return?

@okeuday
Copy link
Contributor

okeuday commented Feb 6, 2020

If you needed to copy via support from cpg, it would be at https://github.com/okeuday/cpg/blob/3b3ee6270ec7988082c8cee247e4cfc3566c3a42/src/cpg.erl#L789-L1107 . That was added into CloudI v1.3.0 (2013-10-20), showing it is much easier to not program with a committee.

@rickard-green
Copy link
Contributor

@essen wrote:

Any reasons why this (or pg2) doesn't include support for the via mechanism?

I guess that it has not been requested before, and, as @josevalim pointed out, that it isn't obvious what strategy to use. If implementing via as a part of pg you probably need to implement a range of different strategies to make everyone happy. This makes me think it is better to leave this to the user. The user should be able to implement their own specific strategy quite easliy I guess.

@AeroNotix
Copy link
Contributor

AeroNotix commented Feb 13, 2020

@max-au I would like to draw your attention to a recent addition to a process group implementation which claims you stole their code:

We all know where this came from, I will not be drawn into giving this abusive user air time by linking to their repository.

cpg provides a process group interface that is focused on availability and partition tolerance (in the CAP theorem). The pg process group implementation added in Erlang/OTP 23 by WhatsApp Inc. (Facebook Inc.) is based on cpg.

@max-au max-au deleted the max-au/pg branch August 28, 2021 18:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement team:VM Assigned to OTP team VM
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants