source: xas/trunk/lib/XAS/Lib/Net/POE/Client.pm @ 356a9c26c4b3ecd30059f983907d4b00a270c27e

Revision 356a9c26c4b3ecd30059f983907d4b00a270c27e, 10.7 KB checked in by Kevin L. Esteb <kevin@…>, 6 years ago (diff)

Fixed some typos

  • Property mode set to 100644
Line 
1package XAS::Lib::Net::POE::Client;
2
3our $VERSION = '0.02';
4
5use POE;
6use Try::Tiny;
7use Socket ':all';
8use Errno ':POSIX';
9use POE::Filter::Line;
10use POE::Wheel::ReadWrite;
11use POE::Wheel::SocketFactory;
12
13use XAS::Class
14  debug     => 0,
15  version   => $VERSION,
16  base      => 'XAS::Lib::POE::Service',
17  mixin     => 'XAS::Lib::Mixins::Keepalive',
18  accessors => 'wheel host port listener socket',
19  utils     => 'dotid',
20  vars => {
21    PARAMS => {
22      -host            => 1,
23      -port            => 1,
24      -retry_reconnect => { optional => 1, default => 1 },
25      -tcp_keepalive   => { optional => 1, default => 0 },
26      -filter          => { optional => 1, default => undef },
27      -alias           => { optional => 1, default => 'client' },
28      -eol             => { optional => 1, default => "\012\015" },
29    }
30  }
31;
32
33our @ERRORS = (0, EPIPE, ETIMEDOUT, ECONNRESET, ECONNREFUSED);
34our @RECONNECTIONS = (60, 120, 240, 480, 960, 1920, 3840);
35
36#use Data::Dumper;
37
38# ----------------------------------------------------------------------
39# Public Events
40# ----------------------------------------------------------------------
41
42# ---------------------------------------------------------------------
43# Public Methods
44# ---------------------------------------------------------------------
45
46sub session_initialize {
47    my $self = shift;
48
49    my $alias = $self->alias;
50
51    $self->log->debug("$alias: entering session_initialize()");
52
53    # private events
54
55    $self->log->debug("$alias: doing private events");
56
57    # private events
58
59    $poe_kernel->state('server_error',     $self, '_server_error');
60    $poe_kernel->state('server_message',   $self, '_server_message');
61    $poe_kernel->state('server_connect',   $self, '_server_connect');
62    $poe_kernel->state('server_connected', $self, '_server_connected');
63    $poe_kernel->state('server_reconnect', $self, '_server_reconnect');
64    $poe_kernel->state('server_connection_failed', $self, '_server_connection_failed');
65
66    # public events
67
68    $self->log->debug("$alias: doing public events");
69
70    $poe_kernel->state('read_data',         $self);
71    $poe_kernel->state('write_data',        $self);
72    $poe_kernel->state('connection_up',     $self);
73    $poe_kernel->state('connection_down',   $self);
74    $poe_kernel->state('handle_connection', $self);
75
76    # walk the chain
77
78    $self->SUPER::session_initialize();
79
80    $self->log->debug("$alias: leaving session_initialize()");
81
82}
83
84sub session_startup {
85    my ($self) = $_[OBJECT];
86
87    my $alias = $self->alias;
88
89    $self->log->debug("$alias: entering session_startup");
90
91    $poe_kernel->post($alias, 'server_connect');
92
93    # walk the chain
94
95    $self->SUPER::session_startup();
96
97    $self->log->debug("$alias: leaving session_startup");
98
99}
100
101sub session_shutdown {
102    my $self = shift;
103   
104    my $alias = $self->alias;
105
106    $self->log->debug("$alias: entering session_shutdown");
107
108    $self->{socket}   = undef;
109    $self->{wheel}    = undef;
110    $self->{listener} = undef;
111
112    # walk the chain
113
114    $self->SUPER::session_shutdown();
115
116    $self->log->debug("$alias: leaving session_shutdown");
117   
118}
119
120sub session_pause {
121    my ($self) = $_[OBJECT];
122
123    my $alias = $self->alias;
124
125    $self->log->debug("$alias: entering session_pause");
126
127    $poe_kernel->call($alias, 'connection_down');
128
129    # walk the chain
130
131    $self->SUPER::session_pause();
132
133    $self->log->debug("$alias: leaving session_pause");
134
135}
136
137sub session_resume {
138    my ($self) = $_[OBJECT];
139
140    my $alias = $self->alias;
141
142    $self->log->debug("$alias: entering session_resume");
143
144    $poe_kernel->call($alias, 'connection_up');
145
146    # walk the chain
147
148    $self->SUPER::session_resume();
149
150    $self->log->debug("$alias: leaving session_resume");
151
152}
153
154# ---------------------------------------------------------------------
155# Public Events
156# ---------------------------------------------------------------------
157
158sub handle_connection {
159    my ($self) = $_[OBJECT];
160
161}
162
163sub connection_down {
164    my ($self) = $_[OBJECT];
165
166}
167
168sub connection_up {
169    my ($self) = $_[OBJECT];
170
171}
172
173sub read_data {
174    my ($self, $data) = @_[OBJECT, ARG0];
175
176    my $alias = $self->alias;
177
178    $poe_kernel->post($alias, 'write_data', $data);
179
180}
181
182sub write_data {
183    my ($self, $data) = @_[OBJECT, ARG0];
184
185    my @packet;
186    my $alias = $self->alias;
187
188    if (my $wheel = $self->wheel) {
189
190        push(@packet, $data);
191        $wheel->put(@packet);
192
193    } else {
194
195        $self->throw_msg(
196            dotid($self->class) . '.write_data.nowheel',
197            'nowheel',
198            $alias
199        );
200
201    }
202
203}
204
205# ---------------------------------------------------------------------
206# Private Events
207# ---------------------------------------------------------------------
208
209sub _server_message {
210    my ($self, $data, $wheel_id) = @_[OBJECT, ARG0, ARG1];
211
212    my $alias = $self->alias;
213
214    $self->log->debug("$alias: _server_message()");
215
216    $poe_kernel->post($alias, 'read_data', $data);
217
218}
219
220sub _server_connected {
221    my ($self, $socket, $peeraddr, $peerport, $wheel_id) = @_[OBJECT,ARG0..ARG3];
222
223    my $alias = $self->alias;
224
225    $self->log->debug("$alias: _server_connected()");
226
227    my $wheel = POE::Wheel::ReadWrite->new(
228        Handle     => $socket,
229        Filter     => $self->filter,
230        InputEvent => 'server_message',
231        ErrorEvent => 'server_error',
232    );
233
234    my $host = gethostbyaddr($peeraddr, AF_INET);
235
236    $self->{host}     = $host;
237    $self->{port}     = $peerport;
238    $self->{wheel}    = $wheel;
239    $self->{socket}   = $socket;
240    $self->{attempts} = 0;
241
242    $poe_kernel->post($alias, 'handle_connection');
243
244}
245
246sub _server_connect {
247    my ($self) = $_[OBJECT];
248
249    my $alias = $self->alias;
250
251    $self->log->debug("$alias: _server_connect()");
252
253    $self->{listner} = POE::Wheel::SocketFactory->new(
254        RemoteAddress  => $self->host,
255        RemotePort     => $self->port,
256        SocketType     => SOCK_STREAM,
257        SocketDomain   => AF_INET,
258        Reuse          => 'no',
259        SocketProtocol => 'tcp',
260        SuccessEvent   => 'server_connected',
261        FailureEvent   => 'server_connection_failed',
262    );
263
264}
265
266sub _server_connection_failed {
267    my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT,ARG0..ARG3];
268
269    my $alias = $self->alias;
270
271    $self->log->debug("$alias: _server_connection_failed()");
272    $self->log->error_msg('server_connection_failed', $alias, $operation, $errnum, $errstr);
273
274    delete $self->{socket};
275    delete $self->{listner};
276    delete $self->{wheel};
277
278    foreach my $error (@ERRORS) {
279
280        if ($errnum == $error) {
281
282            $poe_kernel->post($alias, 'server_reconnect');
283            last;
284
285        }
286
287    }
288
289}
290
291sub _server_error {
292    my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT,ARG0..ARG3];
293
294    my $alias = $self->alias;
295
296    $self->log->debug("$alias: _server_error()");
297    $self->log->error_msg('server_error', $alias, $operation, $errnum, $errstr);
298
299    delete $self->{socket};
300    delete $self->{listner};
301    delete $self->{wheel};
302
303    $poe_kernel->post($alias, 'connection_down');
304
305    foreach my $error (@ERRORS) {
306
307        if ($errnum == $error) {
308
309            $poe_kernel->post($alias, 'server_reconnect');
310            last;
311
312        }
313
314    }
315
316}
317
318sub _server_reconnect {
319    my ($self) = $_[OBJECT];
320
321    my $retry;
322    my $alias = $self->alias;
323
324    $self->log->warn_msg('server_reconnect', $alias, $self->{attempts}, $self->{count});
325
326    if ($self->{attempts} < $self->{count}) {
327
328        my $delay = $RECONNECTIONS[$self->{attempts}];
329        $self->log->warn_msg('server_attempts', $alias, $self->{attempts}, $delay);
330        $self->{attempts} += 1;
331        $poe_kernel->delay('server_connect', $delay);
332
333    } else {
334
335        $retry = $self->retry_reconnect || 0;
336
337        if ($retry) {
338
339            $self->log->warn_msg('server_recycle', $alias);
340            $self->{attempts} = 0;
341            $poe_kernel->post($alias, 'server_connect');
342
343        } else {
344
345            $self->log->warn_msg('server_shutdown', $alias);
346            $poe_kernel->post($alias, 'session_shutdown');
347
348        }
349
350    }
351
352}
353
354# ---------------------------------------------------------------------
355# Private Methods
356# ---------------------------------------------------------------------
357
358sub init {
359    my $class = shift;
360
361    my $self = $class->SUPER::init(@_);
362
363    $self->{attempts} = 0;
364    $self->{count} = scalar(@RECONNECTIONS);
365
366    unless (defined($self->{filter})) {
367
368        $self->{filter} = POE::Filter::Line->new(
369            InputLiteral  => $self->eol,
370            OutputLiteral => $self->eol,
371        );
372
373    }
374
375    return $self;
376
377}
378
3791;
380
381__END__
382
383=head1 NAME
384
385XAS::Lib::Net::POE::Client - An asynchronous network client based on POE
386
387=head1 SYNOPSIS
388
389This module is a class used to create network clients.
390
391 package Client;
392
393 use POE;
394 use XAS::Class
395   version => '1.0',
396   base    => 'XAS::Lib::Net::POE::Client'
397 ;
398
399 sub handle_connection {
400    my ($self) = $_[OBJECT];
401
402    my $packet = "hello!";
403
404    $poe_kernel->yield('write_data', $packet);
405
406 }
407
408=head1 DESCRIPTION
409
410This module handles the nitty-gritty details of setting up the communications
411channel to a server. You will need to sub-class this module with your own for
412it to be useful.
413
414An attempt to maintain that channel will be made when/if that server should
415happen to disappear off the network. There is nothing more unpleasant then
416having to go around to dozens of servers and restarting processes.
417
418=head1 METHODS
419
420=head2 new
421
422This method initializes the class and starts a session to handle the
423communications channel. It takes the following parameters:
424
425=over 4
426
427=item B<-alias>
428
429The session alias, defaults to 'client'.
430
431=item B<-server>
432
433The servers host name.
434
435=item B<-port>
436
437The servers port number.
438
439=item B<-retry_count>
440
441Wither to attempt reconnections after they run out. Defaults to true.
442
443=item B<-tcp_keepalive>
444
445For those pesky firewalls, defaults to false
446
447=back
448
449=head2 read_data
450
451This event is triggered when data is received for the server.
452
453=head2 write_data
454
455You use this event to send data to the server.
456
457=head2 handle_connection
458
459This event is triggered upon initial connection to the server.
460
461=head2 connection_down
462
463This event is triggered to allow you to be notified if
464the connection to the server is currently down.
465
466=head2 connection_up
467
468This event is triggered to allow you to be notified when the connection
469to the server is restored.
470
471=head1 SEE ALSO
472
473=over 4
474
475=item L<XAS|XAS>
476
477=back
478
479=head1 AUTHOR
480
481Kevin L. Esteb, E<lt>kevin@kesteb.usE<gt>
482
483=head1 COPYRIGHT AND LICENSE
484
485Copyright (C) 2014 Kevin L. Esteb
486
487This library is free software; you can redistribute it and/or modify
488it under the same terms as Perl itself, either Perl version 5.8.8 or,
489at your option, any later version of Perl 5 you may have available.
490
491See L<http://dev.perl.org/licenses/> for more information.
492
493=cut
Note: See TracBrowser for help on using the repository browser.