source: xas/trunk/lib/XAS/Lib/Stomp/POE/Client.pm @ 356a9c26c4b3ecd30059f983907d4b00a270c27e

Revision 356a9c26c4b3ecd30059f983907d4b00a270c27e, 12.8 KB checked in by Kevin L. Esteb <kevin@…>, 6 years ago (diff)

Fixed some typos

  • Property mode set to 100644
Line 
1package XAS::Lib::Stomp::POE::Client;
2
3our $VERSION = '0.03';
4
5use POE;
6use Try::Tiny;
7use XAS::Lib::Stomp::Utils;
8use XAS::Lib::Stomp::POE::Filter;
9
10use XAS::Class
11  debug     => 0,
12  version   => $VERSION,
13  base      => 'XAS::Lib::Net::POE::Client',
14  mixins    => 'XAS::Lib::Mixins::Keepalive',
15  accessors => 'stomp',
16  vars => {
17    PARAMS => {
18      -alias    => { optional => 1, default => 'stomp-client' },
19      -host     => { optional => 1, default => 'localhost' },
20      -port     => { optional => 1, default => 61613 },
21      -login    => { optional => 1, default => 'guest' },
22      -passcode => { optional => 1, default => 'guest' },
23      -target   => { optional => 1, default => '1.0', regex => qr/(1\.0|1\.1|1\.2)/ },
24    }
25  }
26;
27
28#use Data::Dumper;
29
30# ----------------------------------------------------------------------
31# Public Events
32# ----------------------------------------------------------------------
33
34# ---------------------------------------------------------------------
35# Public Methods
36# ---------------------------------------------------------------------
37
38sub session_initialize {
39    my $self = shift;
40
41    my $alias = $self->alias;
42
43    $self->log->debug("$alias: session_initialize()");
44
45    # public events
46
47    $self->log->debug("$alias: doing public events");
48
49    $poe_kernel->state('handle_noop',       $self);
50    $poe_kernel->state('handle_error',      $self);
51    $poe_kernel->state('handle_message',    $self);
52    $poe_kernel->state('handle_receipt',    $self);
53    $poe_kernel->state('handle_connected',  $self);
54
55    # walk the chain
56
57    $self->SUPER::session_initialize();
58
59    $self->log->debug("$alias: leaving session_initialize()");
60
61}
62
63sub session_shutdown {
64    my $self = shift;
65
66    my $alias = $self->alias;
67    my $frame = $self->stomp->disconnect(
68        -receipt => 'disconnecting'
69    );
70
71    $self->log->debug("$alias: entering session_shutdown()");
72
73    $poe_kernel->call($alias, 'write_data', $frame);
74
75    # walk the chain
76
77    $self->SUPER::session_shutdown();
78
79    $self->log->debug("$alias: leaving session_shutdown()");
80
81}
82
83# ---------------------------------------------------------------------
84# Public Events
85# ---------------------------------------------------------------------
86
87sub handle_connection {
88    my ($self) = $_[OBJECT];
89
90    my $alias = $self->alias;
91    my $frame = $self->stomp->connect(
92        -login    => $self->login,
93        -passcode => $self->passcode
94    );
95
96    $self->log->debug("$alias: entering handle_connection()");
97
98    $poe_kernel->post($alias, 'write_data', $frame);
99
100    $self->log->debug("$alias: leaving handle_connection()");
101
102}
103
104sub handle_connected {
105    my ($self, $frame) = @_[OBJECT, ARG0];
106
107    my $alias = $self->alias;
108
109    $self->log->debug("$alias: entering handle_connected()");
110
111    if ($self->tcp_keepalive) {
112
113        $self->log->info("$alias: tcp_keepalive enabled");
114
115        $self->init_keepalive();
116        $self->enable_keepalive($self->socket);
117
118    }
119
120    $self->log->info_msg('connected', $alias, $self->host, $self->port);
121
122    $poe_kernel->post($alias, 'connection_up');
123
124    $self->log->debug("$alias: leaving handle_connected()");
125
126}
127
128sub handle_message {
129    my ($self, $frame) = @_[OBJECT, ARG0];
130
131}
132
133sub handle_receipt {
134    my ($self, $frame) = @_[OBJECT, ARG0];
135
136}
137
138sub handle_error {
139    my ($self, $frame) = @_[OBJECT, ARG0];
140
141    my $alias = $self->alias;
142
143    $self->log->error_msg('errors',
144        $alias,
145        $frame->header->message_id,
146        $frame->header->message,
147        $frame->body
148    );
149
150}
151
152sub handle_noop {
153    my ($self, $frame) = @_[OBJECT, ARG0];
154
155    my $alias = $self->alias;
156
157    $self->log->debug("$alias: handle_noop()");
158
159}
160
161# ---------------------------------------------------------------------
162# Private Events
163# ---------------------------------------------------------------------
164
165sub _server_message {
166    my ($self, $frame, $wheel_id) = @_[OBJECT, ARG0, ARG1];
167
168    my $alias = $self->alias;
169
170    $self->log->debug("$alias: entering _server_message()");
171
172    if ($frame->command eq 'CONNECTED') {
173
174        $self->log->debug("$alias: received a \"CONNECTED\" message");
175        $poe_kernel->post($alias, 'handle_connected', $frame);
176
177    } elsif ($frame->command eq 'MESSAGE') {
178
179        $self->log->debug("$alias: received a \"MESSAGE\" message");
180        $poe_kernel->post($alias, 'handle_message', $frame);
181
182    } elsif ($frame->command eq 'RECEIPT') {
183
184        $self->log->debug("$alias: received a \"RECEIPT\" message");
185        $poe_kernel->post($alias, 'handle_receipt', $frame);
186
187    } elsif ($frame->command eq 'ERROR') {
188
189        $self->log->debug("$alias: received an \"ERROR\" message");
190        $poe_kernel->post($alias, 'handle_error', $frame);
191
192    } elsif ($frame->command eq 'NOOP') {
193
194        $self->log->debug("$alias: received an \"NOOP\" message");
195        $poe_kernel->post($alias, 'handle_noop', $frame);
196
197    } else {
198
199        $self->log->warn("$alias: unknown message type: $frame->command");
200
201    }
202
203    $self->log->debug("$alias: leaving _server_message()");
204
205}
206
207# ---------------------------------------------------------------------
208# Private Methods
209# ---------------------------------------------------------------------
210
211sub init {
212    my $class = shift;
213
214    my $self = $class->SUPER::init(@_);
215
216    $self->{stomp} = XAS::Lib::Stomp::Utils->new();
217
218    unless (defined($self->{filter})) {
219
220        $self->{filter} = XAS::Lib::Stomp::POE::Filter->new(
221            -target => $self->target
222        );
223
224    }
225
226    return $self;
227
228}
229
2301;
231
232__END__
233
234=head1 NAME
235
236XAS::Lib::Stomp::POE::Client - A STOMP client for the POE Environment
237
238=head1 SYNOPSIS
239
240This module is a class used to create clients that need to access a
241message server that communicates with the STOMP protocol. Your program could
242look as follows:
243
244 package Client;
245
246 use POE;
247 use XAS::Class
248   version => '1.0',
249   base    => 'XAS::Lib::Stomp::POE::Client',
250 ;
251
252 sub handle_connection {
253    my ($self) = @_[OBJECT];
254 
255    my $nframe = $self->stomp->connect(
256        -login    => 'testing',
257        -passcode => 'testing'
258    );
259
260    $poe_kernel->yield('send_data', $nframe);
261
262 }
263
264 sub handle_connected {
265    my ($self, $frame) = @_[OBJECT, ARG0];
266
267    my $nframe = $self->stomp->subscribe(
268        -queue => $self->queue,
269        -ack   => 'client'
270    );
271
272    $poe_kernel->yield('send_data', $nframe);
273
274 }
275 
276 sub handle_message {
277    my ($self, $frame) = @_[OBJECT, ARG0];
278
279    my $nframe = $self->stomp->ack(
280       -message_id => $frame->header->message_id
281    );
282
283    $poe_kernel->yield('send_data', $nframe);
284
285 }
286
287 package main;
288
289 use POE;
290 use strict;
291
292 Client->new(
293    -alias => 'testing',
294    -queue => '/queue/testing',
295 );
296
297 $poe_kernel->run();
298
299 exit 0;
300
301
302=head1 DESCRIPTION
303
304This module handles the nitty-gritty details of setting up the communications
305channel to a message queue server. You will need to sub-class this module
306with your own for it to be useful.
307
308An attempt to maintain that channel will be made when/if that server should
309happen to disappear off the network. There is nothing more unpleasant then
310having to go around to dozens of servers and restarting processes.
311
312When messages are received, specific events are generated. Those events are
313based on the message type. If you are interested in those events you should
314override the default behavior for those events. The default behavior is to
315do nothing.
316
317=head1 METHODS
318
319=head2 new
320
321This method initializes the class and starts a session to handle the
322communications channel. It takes the following parameters:
323
324=over 4
325
326=item B<-alias>
327
328The session alias, defaults to 'stomp-client'.
329
330=item B<-server>
331
332The servers host name, defaults to 'localhost'.
333
334=item B<-port>
335
336The servers port number, defaults to '61613'.
337
338=item B<-target>
339
340The STOMP protocol version that is targeted. Defaults to '1.0'.
341
342=item B<-retry_count>
343
344Wither to attempt reconnections after they run out. Defaults to true.
345
346=item B<-enable_keepalive>
347
348For those pesky firewalls, defaults to false
349
350=back
351
352=head2 send_data
353
354You use this event to send Stomp frames to the server.
355
356=over 4
357
358=item Example
359
360 $kernel->yield('send_data', $frame);
361
362=back
363
364=head2 handle_connection
365
366This event is signaled and the corresponding method is called upon initial
367connection to the message server. For the most part you should send a
368"CONNECT" frame to the server.
369
370 Example
371
372    sub handle_connection {
373        my ($self) = @_[$OBJECT];
374 
375       my $nframe = $self->stomp->connect(
376           -login => 'testing',
377           -passcode => 'testing'
378       );
379
380       $poe_kernel->yield('send_data', $nframe);
381
382    }
383
384=head2 handled_connected
385
386This event and corresponding method is called when a "CONNECT" frame is
387received from the server. This means the server will allow you to start
388generating/processing frames.
389
390 Example
391
392    sub handle_connected {
393        my ($self, $frame) = @_[OBJECT,ARG0];
394
395        my $nframe = $self->stomp->subscribe(
396            -queue => $self->queue,
397            -ack => 'client'
398        );
399
400        $poe_kernel->yield('send_data', $nframe);
401
402    }
403
404This example shows you how to subscribe to a particular queue. The queue name
405was passed as a parameter to new().
406
407=head2 handle_message
408
409This event and corresponding method is used to process "MESSAGE" frames.
410
411 Example
412
413    sub handle_message {
414        my ($self, $frame) = @_[OBJECT,ARG0];
415 
416        my $nframe = $self->stomp->ack(
417            -message_id => $frame->header->message_id
418        );
419
420        $poe_kernel->yield('send_data', $nframe);
421
422    }
423
424This example really doesn't do much other then "ack" the messages that are
425received.
426
427=head2 handle_receipt
428
429This event and corresponding method is used to process "RECEIPT" frames.
430
431 Example
432
433    sub handle_receipt {
434        my ($self, $frame) = @_[OBJECT,ARG0];
435
436        my $receipt = $frame->header->receipt;
437
438    }
439
440This example really doesn't do much, and you really don't need to worry about
441receipts unless you ask for one when you send a frame to the server. So this
442method could be safely left with the default.
443
444=head2 handle_error
445
446This event and corresponding method is used to process "ERROR" frames.
447
448 Example
449
450    sub handle_error {
451        my ($self, $frame) = @_[OBJECT,ARG0];
452 
453    }
454
455This example really doesn't do much. Error handling is pretty much what the
456process needs to do when something unexpected happens.
457
458=head2 handle_noop
459
460This event and corresponding method is used to process "NOOP" frames.
461
462 Example
463
464    sub handle_noop {
465        my ($self, $frame) = @_[OBJECT,ARG0];
466 
467    }
468
469This example really doesn't do much.
470
471=head2 gather_data
472
473This event and corresponding method is used to "gather data". How that is done
474is up to your program. But usually a "send_data" event is generated.
475
476 Example
477
478    sub gather_data {
479        my ($self) = @_[OBJECT];
480 
481        # doing something here
482
483        $poe_kernel->yield('send_data', $frame);
484
485    }
486
487=head2 connection_down
488
489This event and corresponding method is a hook to allow you to be notified if
490the connection to the server is currently down. By default it does nothing.
491But it would be useful to notify "gather_data" to temporarily stop doing
492whatever it is currently doing.
493
494 Example
495
496    sub connection_down {
497        my ($self) = @_[OBJECT];
498
499        # do something here
500
501    }
502
503=head2 connection_up
504
505This event and corresponding method is a hook to allow you to be notified
506when the connection to the server up. By default it does nothing.
507But it would be useful to notify "gather_data" to start doing
508whatever it supposed to do.
509
510 Example
511
512    sub connection_up {
513       my ($self) = @_[OBJECT];
514
515       # do something here
516
517    }
518
519=head2 session_cleanup
520
521This method is a hook and should be overridden to do "shutdown" stuff. By
522default it sends a "DISCONNECT" message to the message queue server.
523
524 Example
525
526    sub session_cleanup {
527        my $self = shift;
528
529        # do something here
530
531        $self->SUPER::session_cleanup();
532
533    }
534
535=head2 session_reload
536
537This method is a hook and should be overridden to do "reload" stuff. By
538default it executes POE's sig_handled() method.
539
540 Example
541
542    sub session_reload {
543        my $self = shift;
544
545        $poe_kernel->sig_handled();
546
547    }
548
549=head1 ACCESSORS
550
551=head2 stomp
552
553This returns an object to the internal XAS::Lib::Stomp::Utils
554object. This is very useful for creating STOMP frames.
555
556 Example
557
558    $frame = $self->stomp->connect(
559         -login    => 'testing',
560         -passcode => 'testing'
561    );
562
563    $poe_kernel->yield('send_data', $frame);
564
565=head1 SEE ALSO
566
567=over 4
568
569=item L<XAS|XAS>
570
571=back
572
573 For details on the protocol see L<http://stomp.github.io/>.
574
575=head1 AUTHOR
576
577Kevin L. Esteb, E<lt>=[@kesteb.usE<gt>
578
579=head1 COPYRIGHT AND LICENSE
580
581Copyright (C) 2014 Kevin L. Esteb
582
583This library is free software; you can redistribute it and/or modify
584it under the same terms as Perl itself, either Perl version 5.8.8 or,
585at your option, any later version of Perl 5 you may have available.
586
587See L<http://dev.perl.org/licenses/> for more information.
588
589=cut
Note: See TracBrowser for help on using the repository browser.