Recebendo dados de um único soquete UDP com mais de um processo erlang

Às vezes, pode haver uma tempestade de pacotes UDP atingindo seu aplicativo, mas a gen_udpdocumentação não fornece nenhuma pista sobre como equilibrar o processamento entre vários processos. Mas é possível compartilhar Socketentre os funcionários e chamar gen_udp:recvem um loop, portanto, o gargalo em potencial pode ser eliminado dessa maneira.

Aqui está um gen_servermanipulador baseado em amostra e algum código na parte inferior para iniciar instâncias de teste.

-module(udp_test).
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-define(PORT, 9876).

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------

-export([start_link/2, send/1,
test_stuff
/0, start_stuff/1]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------

-export([init/1, handle_call/3, handle_cast/2,
handle_info
/2, terminate/2, code_change/3]).

-record(state, { id=0, sock }).

%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

start_link
(ID, Sock) ->
gen_server
:start_link(?MODULE, [{id, ID}, {socket, Sock}], []).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------

init
(Args) ->
ID
= proplists:get_value(id, Args, 1),
case proplists:get_value(socket, Args) of
undefined ->
{stop, nosock};
Sock ->
{ok, #state{ id=ID, sock=Sock }, 0}
end.

handle_call
(_Request, _From, State) ->
{stop, {error, unknownmsg}, State}.

handle_cast
(_Request, State) ->
{stop, {error, unknownmsg}, State}.

handle_info
(timeout, #state{ sock=undefined } = State) ->
{noreply, State, 10};
handle_info
(timeout, #state{ id=ID, sock=Sock } = State) ->
TO
= case gen_udp:recv(Sock, 4) of
{error, _} -> 10;
_Data ->
catch ets:insert_new(?MODULE, {ID, 0}),
catch ets:update_counter(?MODULE, ID, 1)
end,
{noreply, State, TO}.

terminate
(_Reason, _State) ->
ok
.

code_change
(_OldVsn, State, _Extra) ->
{ok, State}.

%% ------------------------------------------------------------------
%% Test Function Definitions
%% ------------------------------------------------------------------

test_stuff
() ->
start_stuff
(10),
[udp_test:send("fooobaar") || _ <- lists:seq(1,1000)],
timer
:sleep(1000),
io
:format("Total: ~p~n", [ets:tab2list(?MODULE)]),
init
:stop().

start_stuff
(N) ->
ets
:new(?MODULE, [named_table, public]),
{ok, S} = gen_udp:open(?PORT, [{active, false}, binary]),
[start_link(ID, S) || ID <- lists:seq(1, N)].

send
(Data) ->
spawn
(fun() ->
{ok, S} = gen_udp:open(0, [{active, false}, binary]),
gen_udp
:send(S, localhost,?PORT, Data)
end).