source: xas/trunk/lib/XAS/Lib/Net/POE/Client.pm @ 50a4331c5c06e744e72b3851c9403eb7ac64d3de

Revision 50a4331c5c06e744e72b3851c9403eb7ac64d3de, 10.1 KB checked in by Kevin L. Esteb <kevin@…>, 6 years ago (diff)

Updating network server socket handling.

  • Property mode set to 100644
Line 
1package XAS::Lib::Net::POE::Client;
2
3our $VERSION = '0.02';
4
5use POE;
6use Try::Tiny;
7use Socket ':all';
8use Errno ':POSIX';
9use POE::Filter::Line;
10use POE::Wheel::ReadWrite;
11use POE::Wheel::SocketFactory;
12
13use XAS::Class
14  debug     => 0,
15  version   => $VERSION,
16  base      => 'XAS::Lib::POE::Service',
17  mixin     => 'XAS::Lib::Mixins::Keepalive',
18  accessors => 'wheel host port listener socket',
19  utils     => 'dotid',
20  vars => {
21    PARAMS => {
22      -host            => 1,
23      -port            => 1,
24      -retry_reconnect => { optional => 1, default => 1 },
25      -tcp_keepalive   => { optional => 1, default => 0 },
26      -filter          => { optional => 1, default => undef },
27      -alias           => { optional => 1, default => 'client' },
28      -eol             => { optional => 1, default => "\012\015" },
29    }
30  }
31;
32
33our @ERRORS = (0, EPIPE, ETIMEDOUT, ECONNRESET, ECONNREFUSED);
34our @RECONNECTIONS = (60, 120, 240, 480, 960, 1920, 3840);
35
36#use Data::Dumper;
37
38# ----------------------------------------------------------------------
39# Public Events
40# ----------------------------------------------------------------------
41
42# ---------------------------------------------------------------------
43# Public Methods
44# ---------------------------------------------------------------------
45
46sub session_intialize {
47    my $self = shift;
48
49    my $alias = $self->alias;
50
51    $self->log->debug("$alias: entering session_initialize()");
52
53    # private events
54
55    $self->log->debug("$alias: doing private events");
56
57    # private events
58
59    $poe_kernel->state('server_error',     $self, '_server_error');
60    $poe_kernel->state('server_message',   $self, '_server_message');
61    $poe_kernel->state('server_connect',   $self, '_server_connect');
62    $poe_kernel->state('server_connected', $self, '_server_connected');
63    $poe_kernel->state('server_reconnect', $self, '_server_reconnect');
64    $poe_kernel->state('server_connection_failed', $self, '_server_connection_failed');
65
66    # public events
67
68    $self->log->debug("$alias: doing public events");
69
70    $poe_kernel->state('read_data',         $self);
71    $poe_kernel->state('write_data',        $self);
72    $poe_kernel->state('connection_up',     $self);
73    $poe_kernel->state('connection_down',   $self);
74    $poe_kernel->state('handle_connection', $self);
75
76    # walk the chain
77
78    $self->SUPER::session_initialize();
79
80    $self->log->debug("$alias: leaving session_initialize()");
81
82}
83
84sub session_startup {
85    my ($self) = $_[OBJECT];
86
87    my $alias = $self->alias;
88
89    $self->log->debug("$alias: session_startup");
90
91    $poe_kernel->post($alias, 'server_connect');
92
93}
94
95sub session_shutdown {
96    my $self = shift;
97   
98    $self->{socket}   = undef;
99    $self->{wheel}    = undef;
100    $self->{listener} = undef;
101
102    # walk the chain
103
104    $self->SUPER::session_shutdown();
105
106}
107
108sub session_pause {
109    my ($self) = $_[OBJECT];
110
111    my $alias = $self->alias;
112
113    $self->log->debug("$alias: session_pause");
114
115    $poe_kernel->call($alias, 'connection_down');
116
117}
118
119sub session_resume {
120    my ($self) = $_[OBJECT];
121
122    my $alias = $self->alias;
123
124    $self->log->debug("$alias: session_resume");
125
126    $poe_kernel->call($alias, 'connection_up');
127
128}
129
130# ---------------------------------------------------------------------
131# Public Events
132# ---------------------------------------------------------------------
133
134sub handle_connection {
135    my ($self) = $_[OBJECT];
136
137}
138
139sub connection_down {
140    my ($self) = $_[OBJECT];
141
142}
143
144sub connection_up {
145    my ($self) = $_[OBJECT];
146
147}
148
149sub read_data {
150    my ($self, $data) = @_[OBJECT, ARG0];
151
152    my $alias = $self->alias;
153
154    $poe_kernel->post($alias, 'write_data', $data);
155
156}
157
158sub write_data {
159    my ($self, $data) = @_[OBJECT, ARG0];
160
161    my @packet;
162    my $alias = $self->alias;
163
164    if (my $wheel = $self->wheel) {
165
166        push(@packet, $data);
167        $wheel->put(@packet);
168
169    } else {
170
171        $self->throw_msg(
172            dotid($self->class) . '.write_data.nowheel',
173            'nowheel',
174            $alias
175        );
176
177    }
178
179}
180
181# ---------------------------------------------------------------------
182# Private Events
183# ---------------------------------------------------------------------
184
185sub _server_message {
186    my ($self, $data, $wheel_id) = @_[OBJECT, ARG0, ARG1];
187
188    my $alias = $self->alias;
189
190    $self->log->debug("$alias: _server_message()");
191
192    $poe_kernel->post($alias, 'read_data', $data);
193
194}
195
196sub _server_connected {
197    my ($self, $socket, $peeraddr, $peerport, $wheel_id) = @_[OBJECT,ARG0..ARG3];
198
199    my $alias = $self->alias;
200
201    $self->log->debug("$alias: _server_connected()");
202
203    my $wheel = POE::Wheel::ReadWrite->new(
204        Handle     => $socket,
205        Filter     => $self->filter,
206        InputEvent => 'server_message',
207        ErrorEvent => 'server_error',
208    );
209
210    my $host = gethostbyaddr($peeraddr, AF_INET);
211
212    $self->{host}     = $host;
213    $self->{port}     = $peerport;
214    $self->{wheel}    = $wheel;
215    $self->{socket}   = $socket;
216    $self->{attempts} = 0;
217
218    $poe_kernel->post($alias, 'handle_connection');
219
220}
221
222sub _server_connect {
223    my ($self) = $_[OBJECT];
224
225    my $alias = $self->alias;
226
227    $self->log->debug("$alias: _server_connect()");
228
229    $self->{listner} = POE::Wheel::SocketFactory->new(
230        RemoteAddress  => $self->host,
231        RemotePort     => $self->port,
232        SocketType     => SOCK_STREAM,
233        SocketDomain   => AF_INET,
234        Reuse          => 'no',
235        SocketProtocol => 'tcp',
236        SuccessEvent   => 'server_connected',
237        FailureEvent   => 'server_connection_failed',
238    );
239
240}
241
242sub _server_connection_failed {
243    my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT,ARG0..ARG3];
244
245    my $alias = $self->alias;
246
247    $self->log->debug("$alias: _server_connection_failed()");
248    $self->log->error_msg('server_connection_failed', $alias, $operation, $errnum, $errstr);
249
250    delete $self->{socket};
251    delete $self->{listner};
252    delete $self->{wheel};
253
254    foreach my $error (@ERRORS) {
255
256        if ($errnum == $error) {
257
258            $poe_kernel->post($alias, 'server_reconnect');
259            last;
260
261        }
262
263    }
264
265}
266
267sub _server_error {
268    my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT,ARG0..ARG3];
269
270    my $alias = $self->alias;
271
272    $self->log->debug("$alias: _server_error()");
273    $self->log->error_msg('server_error', $alias, $operation, $errnum, $errstr);
274
275    delete $self->{socket};
276    delete $self->{listner};
277    delete $self->{wheel};
278
279    $poe_kernel->post($alias, 'connection_down');
280
281    foreach my $error (@ERRORS) {
282
283        if ($errnum == $error) {
284
285            $poe_kernel->post($alias, 'server_reconnect');
286            last;
287
288        }
289
290    }
291
292}
293
294sub _server_reconnect {
295    my ($self) = $_[OBJECT];
296
297    my $retry;
298    my $alias = $self->alias;
299
300    $self->log->warn_msg('server_reconnect', $alias, $self->{attempts}, $self->{count});
301
302    if ($self->{attempts} < $self->{count}) {
303
304        my $delay = $RECONNECTIONS[$self->{attempts}];
305        $self->log->warn_msg('server_attempts', $alias, $self->{attempts}, $delay);
306        $self->{attempts} += 1;
307        $poe_kernel->delay('server_connect', $delay);
308
309    } else {
310
311        $retry = $self->retry_reconnect || 0;
312
313        if ($retry) {
314
315            $self->log->warn_msg('server_recycle', $alias);
316            $self->{attempts} = 0;
317            $poe_kernel->post($alias, 'server_connect');
318
319        } else {
320
321            $self->log->warn_msg('server_shutdown', $alias);
322            $poe_kernel->post($alias, 'session_shutdown');
323
324        }
325
326    }
327
328}
329
330# ---------------------------------------------------------------------
331# Private Methods
332# ---------------------------------------------------------------------
333
334sub init {
335    my $class = shift;
336
337    my $self = $class->SUPER::init(@_);
338
339    $self->{attempts} = 0;
340    $self->{count} = scalar(@RECONNECTIONS);
341
342    unless (defined($self->{filter})) {
343
344        $self->{filter} = POE::Filter::Line->new(
345            InputLiteral  => $self->eol,
346            OutputLiteral => $self->eol,
347        );
348
349    }
350
351    return $self;
352
353}
354
3551;
356
357__END__
358
359=head1 NAME
360
361XAS::Lib::Net::POE::Client - An asynchronous network client based on POE
362
363=head1 SYNOPSIS
364
365This module is a class used to create network clients.
366
367 package Client;
368
369 use POE;
370 use XAS::Class
371   version => '1.0',
372   base    => 'XAS::Lib::Net::POE::Client'
373 ;
374
375 sub handle_connection {
376    my ($self) = $_[OBJECT];
377
378    my $packet = "hello!";
379
380    $poe_kernel->yield('write_data', $packet);
381
382 }
383
384=head1 DESCRIPTION
385
386This module handles the nitty-gritty details of setting up the communications
387channel to a server. You will need to sub-class this module with your own for
388it to be useful.
389
390An attempt to maintain that channel will be made when/if that server should
391happen to disappear off the network. There is nothing more unpleasant then
392having to go around to dozens of servers and restarting processes.
393
394=head1 METHODS
395
396=head2 new
397
398This method initializes the class and starts a session to handle the
399communications channel. It takes the following parameters:
400
401=over 4
402
403=item B<-alias>
404
405The session alias, defaults to 'client'.
406
407=item B<-server>
408
409The servers host name.
410
411=item B<-port>
412
413The servers port number.
414
415=item B<-retry_count>
416
417Wither to attempt reconnections after they run out. Defaults to true.
418
419=item B<-tcp_keepalive>
420
421For those pesky firewalls, defaults to false
422
423=back
424
425=head2 read_data
426
427This event is triggered when data is received for the server.
428
429=head2 write_data
430
431You use this event to send data to the server.
432
433=head2 handle_connection
434
435This event is triggered upon initial connection to the server.
436
437=head2 connection_down
438
439This event is triggered to allow you to be notified if
440the connection to the server is currently down.
441
442=head2 connection_up
443
444This event is triggered to allow you to be notified when the connection
445to the server is restored.
446
447=head1 SEE ALSO
448
449=over 4
450
451=item L<XAS|XAS>
452
453=back
454
455=head1 AUTHOR
456
457Kevin L. Esteb, E<lt>kevin@kesteb.usE<gt>
458
459=head1 COPYRIGHT AND LICENSE
460
461Copyright (C) 2014 Kevin L. Esteb
462
463This library is free software; you can redistribute it and/or modify
464it under the same terms as Perl itself, either Perl version 5.8.8 or,
465at your option, any later version of Perl 5 you may have available.
466
467See L<http://dev.perl.org/licenses/> for more information.
468
469=cut
Note: See TracBrowser for help on using the repository browser.