source: xas/trunk/lib/XAS/Lib/Stomp/POE/Client.pm @ c91280cb582532e42a6555898e9c3662f7a6c282

Revision c91280cb582532e42a6555898e9c3662f7a6c282, 12.2 KB checked in by Kevin L. Esteb <kevin@…>, 6 years ago (diff)

Fixed a couple of typos

  • Property mode set to 100644
Line 
1package XAS::Lib::Stomp::POE::Client;
2
3our $VERSION = '0.03';
4
5use POE;
6use Try::Tiny;
7use XAS::Lib::Stomp::Utils;
8use XAS::Lib::Stomp::POE::Filter;
9
10use XAS::Class
11  debug     => 0,
12  version   => $VERSION,
13  base      => 'XAS::Lib::Net::POE::Client',
14  mixins    => 'XAS::Lib::Mixins::Keepalive',
15  accessors => 'stomp',
16  vars => {
17    PARAMS => {
18      -alias    => { optional => 1, default => 'stomp-client' },
19      -host     => { optional => 1, default => 'localhost' },
20      -port     => { optional => 1, default => 61613 },
21      -login    => { optional => 1, default => 'guest' },
22      -passcode => { optional => 1, default => 'guest' },
23      -target   => { optional => 1, default => '1.0', regex => qr/(1\.0|1\.1|1\.2)/ },
24    }
25  }
26;
27
28#use Data::Dumper;
29
30# ----------------------------------------------------------------------
31# Public Events
32# ----------------------------------------------------------------------
33
34# ---------------------------------------------------------------------
35# Public Methods
36# ---------------------------------------------------------------------
37
38sub session_initalize {
39    my $self = shift;
40
41    my $alias = $self->alias;
42
43    $self->log->debug("$alias: session_initialize()");
44
45    # public events
46
47    $self->log->debug("$alias: doing public events");
48
49    $poe_kernel->state('handle_noop',       $self);
50    $poe_kernel->state('handle_error',      $self);
51    $poe_kernel->state('handle_message',    $self);
52    $poe_kernel->state('handle_receipt',    $self);
53    $poe_kernel->state('handle_connected',  $self);
54
55    # walk the chain
56
57    $self->SUPER::session_initialize();
58
59    $self->log->debug("$alias: leaving session_initialize()");
60
61}
62
63sub session_shutdown {
64    my $self = shift;
65
66    my $alias = $self->alias;
67    my $frame = $self->stomp->disconnect(
68        -receipt => 'disconnecting'
69    );
70
71    $poe_kernel->call($alias, 'write_data', $frame);
72    $self->SUPER::session_shutdown();
73
74}
75
76# ---------------------------------------------------------------------
77# Public Events
78# ---------------------------------------------------------------------
79
80sub handle_connection {
81    my ($self) = $_[OBJECT];
82
83    my $alias = $self->alias;
84    my $frame = $self->frame->connect(
85        -login    => $self->login,
86        -passcode => $self->passcode
87    );
88
89    $poe_kernel->post($alias, 'write_data', $frame);
90   
91}
92
93sub handle_connected {
94    my ($self, $frame) = @_[OBJECT, ARG0];
95
96    my $alias = $self->alias;
97
98    if ($self->tcp_keepalive) {
99
100        $self->log->info("$alias: tcp_keepalive enabled");
101
102        $self->init_keepalive();
103        $self->enable_keepalive($self->socket);
104
105    }
106
107    $self->log->info_msg('connected', $alias, $self->host, $self->port);
108
109    $poe_kernel->post($alias, 'connection_up');
110
111}
112
113sub handle_message {
114    my ($self, $frame) = @_[OBJECT, ARG0];
115
116}
117
118sub handle_receipt {
119    my ($self, $frame) = @_[OBJECT, ARG0];
120
121}
122
123sub handle_error {
124    my ($self, $frame) = @_[OBJECT, ARG0];
125
126    my $alias = $self->alias;
127
128    $self->log->error_msg('errors',
129        $alias,
130        $frame->header->message_id,
131        $frame->header->message,
132        $frame->body
133    );
134
135}
136
137sub handle_noop {
138    my ($self, $frame) = @_[OBJECT, ARG0];
139
140}
141
142# ---------------------------------------------------------------------
143# Private Events
144# ---------------------------------------------------------------------
145
146sub _server_message {
147    my ($self, $frame, $wheel_id) = @_[OBJECT, ARG0, ARG1];
148
149    my $alias = $self->alias;
150
151    $self->log->debug("$alias: _server_message()");
152
153    if ($frame->command eq 'CONNECTED') {
154
155        $self->log->debug("$alias: received a \"CONNECTED\" message");
156        $poe_kernel->post($alias, 'handle_connected', $frame);
157
158    } elsif ($frame->command eq 'MESSAGE') {
159
160        $self->log->debug("$alias: received a \"MESSAGE\" message");
161        $poe_kernel->post($alias, 'handle_message', $frame);
162
163    } elsif ($frame->command eq 'RECEIPT') {
164
165        $self->log->debug("$alias: received a \"RECEIPT\" message");
166        $poe_kernel->post($alias, 'handle_receipt', $frame);
167
168    } elsif ($frame->command eq 'ERROR') {
169
170        $self->log->debug("$alias: received an \"ERROR\" message");
171        $poe_kernel->post($alias, 'handle_error', $frame);
172
173    } elsif ($frame->command eq 'NOOP') {
174
175        $self->log->debug("$alias: received an \"NOOP\" message");
176        $poe_kernel->post($alias, 'handle_noop', $frame);
177
178    } else {
179
180        $self->log->warn("$alias: unknown message type: $frame->command");
181
182    }
183
184}
185
186# ---------------------------------------------------------------------
187# Private Methods
188# ---------------------------------------------------------------------
189
190sub init {
191    my $class = shift;
192
193    my $self = $class->SUPER::init(@_);
194
195    $self->{stomp} = XAS::Lib::Stomp::Utils->new();
196
197    unless (defined($self->{filter})) {
198
199        $self->{filter} = XAS::Lib::Stomp::POE::Filter->new(
200            -target => $self->target
201        );
202
203    }
204
205    return $self;
206
207}
208
2091;
210
211__END__
212
213=head1 NAME
214
215XAS::Lib::Stomp::POE::Client - A STOMP client for the POE Environment
216
217=head1 SYNOPSIS
218
219This module is a class used to create clients that need to access a
220message server that communicates with the STOMP protocol. Your program could
221look as follows:
222
223 package Client;
224
225 use POE;
226 use XAS::Class
227   version => '1.0',
228   base    => 'XAS::Lib::Stomp::POE::Client',
229 ;
230
231 sub handle_connection {
232    my ($self) = @_[OBJECT];
233 
234    my $nframe = $self->stomp->connect(
235        -login    => 'testing',
236        -passcode => 'testing'
237    );
238
239    $poe_kernel->yield('send_data', $nframe);
240
241 }
242
243 sub handle_connected {
244    my ($self, $frame) = @_[OBJECT, ARG0];
245
246    my $nframe = $self->stomp->subscribe(
247        -queue => $self->queue,
248        -ack   => 'client'
249    );
250
251    $poe_kernel->yield('send_data', $nframe);
252
253 }
254 
255 sub handle_message {
256    my ($self, $frame) = @_[OBJECT, ARG0];
257
258    my $nframe = $self->stomp->ack(
259       -message_id => $frame->header->message_id
260    );
261
262    $poe_kernel->yield('send_data', $nframe);
263
264 }
265
266 package main;
267
268 use POE;
269 use strict;
270
271 Client->new(
272    -alias => 'testing',
273    -queue => '/queue/testing',
274 );
275
276 $poe_kernel->run();
277
278 exit 0;
279
280
281=head1 DESCRIPTION
282
283This module handles the nitty-gritty details of setting up the communications
284channel to a message queue server. You will need to sub-class this module
285with your own for it to be useful.
286
287An attempt to maintain that channel will be made when/if that server should
288happen to disappear off the network. There is nothing more unpleasant then
289having to go around to dozens of servers and restarting processes.
290
291When messages are received, specific events are generated. Those events are
292based on the message type. If you are interested in those events you should
293override the default behavior for those events. The default behavior is to
294do nothing.
295
296=head1 METHODS
297
298=head2 new
299
300This method initializes the class and starts a session to handle the
301communications channel. It takes the following parameters:
302
303=over 4
304
305=item B<-alias>
306
307The session alias, defaults to 'stomp-client'.
308
309=item B<-server>
310
311The servers host name, defaults to 'localhost'.
312
313=item B<-port>
314
315The servers port number, defaults to '61613'.
316
317=item B<-target>
318
319The STOMP protocol version that is targeted. Defaults to '1.0'.
320
321=item B<-retry_count>
322
323Wither to attempt reconnections after they run out. Defaults to true.
324
325=item B<-enable_keepalive>
326
327For those pesky firewalls, defaults to false
328
329=back
330
331=head2 send_data
332
333You use this event to send Stomp frames to the server.
334
335=over 4
336
337=item Example
338
339 $kernel->yield('send_data', $frame);
340
341=back
342
343=head2 handle_connection
344
345This event is signaled and the corresponding method is called upon initial
346connection to the message server. For the most part you should send a
347"CONNECT" frame to the server.
348
349 Example
350
351    sub handle_connection {
352        my ($self) = @_[$OBJECT];
353 
354       my $nframe = $self->stomp->connect(
355           -login => 'testing',
356           -passcode => 'testing'
357       );
358
359       $poe_kernel->yield('send_data', $nframe);
360
361    }
362
363=head2 handled_connected
364
365This event and corresponding method is called when a "CONNECT" frame is
366received from the server. This means the server will allow you to start
367generating/processing frames.
368
369 Example
370
371    sub handle_connected {
372        my ($self, $frame) = @_[OBJECT,ARG0];
373
374        my $nframe = $self->stomp->subscribe(
375            -queue => $self->queue,
376            -ack => 'client'
377        );
378
379        $poe_kernel->yield('send_data', $nframe);
380
381    }
382
383This example shows you how to subscribe to a particular queue. The queue name
384was passed as a parameter to new().
385
386=head2 handle_message
387
388This event and corresponding method is used to process "MESSAGE" frames.
389
390 Example
391
392    sub handle_message {
393        my ($self, $frame) = @_[OBJECT,ARG0];
394 
395        my $nframe = $self->stomp->ack(
396            -message_id => $frame->header->message_id
397        );
398
399        $poe_kernel->yield('send_data', $nframe);
400
401    }
402
403This example really doesn't do much other then "ack" the messages that are
404received.
405
406=head2 handle_receipt
407
408This event and corresponding method is used to process "RECEIPT" frames.
409
410 Example
411
412    sub handle_receipt {
413        my ($self, $frame) = @_[OBJECT,ARG0];
414
415        my $receipt = $frame->header->receipt;
416
417    }
418
419This example really doesn't do much, and you really don't need to worry about
420receipts unless you ask for one when you send a frame to the server. So this
421method could be safely left with the default.
422
423=head2 handle_error
424
425This event and corresponding method is used to process "ERROR" frames.
426
427 Example
428
429    sub handle_error {
430        my ($self, $frame) = @_[OBJECT,ARG0];
431 
432    }
433
434This example really doesn't do much. Error handling is pretty much what the
435process needs to do when something unexpected happens.
436
437=head2 handle_noop
438
439This event and corresponding method is used to process "NOOP" frames.
440
441 Example
442
443    sub handle_noop {
444        my ($self, $frame) = @_[OBJECT,ARG0];
445 
446    }
447
448This example really doesn't do much.
449
450=head2 gather_data
451
452This event and corresponding method is used to "gather data". How that is done
453is up to your program. But usually a "send_data" event is generated.
454
455 Example
456
457    sub gather_data {
458        my ($self) = @_[OBJECT];
459 
460        # doing something here
461
462        $poe_kernel->yield('send_data', $frame);
463
464    }
465
466=head2 connection_down
467
468This event and corresponding method is a hook to allow you to be notified if
469the connection to the server is currently down. By default it does nothing.
470But it would be useful to notify "gather_data" to temporarily stop doing
471whatever it is currently doing.
472
473 Example
474
475    sub connection_down {
476        my ($self) = @_[OBJECT];
477
478        # do something here
479
480    }
481
482=head2 connection_up
483
484This event and corresponding method is a hook to allow you to be notified
485when the connection to the server up. By default it does nothing.
486But it would be useful to notify "gather_data" to start doing
487whatever it supposed to do.
488
489 Example
490
491    sub connection_up {
492       my ($self) = @_[OBJECT];
493
494       # do something here
495
496    }
497
498=head2 session_cleanup
499
500This method is a hook and should be overridden to do "shutdown" stuff. By
501default it sends a "DISCONNECT" message to the message queue server.
502
503 Example
504
505    sub session_cleanup {
506        my $self = shift;
507
508        # do something here
509
510        $self->SUPER::session_cleanup();
511
512    }
513
514=head2 session_reload
515
516This method is a hook and should be overridden to do "reload" stuff. By
517default it executes POE's sig_handled() method.
518
519 Example
520
521    sub session_reload {
522        my $self = shift;
523
524        $poe_kernel->sig_handled();
525
526    }
527
528=head1 ACCESSORS
529
530=head2 stomp
531
532This returns an object to the internal XAS::Lib::Stomp::Utils
533object. This is very useful for creating STOMP frames.
534
535 Example
536
537    $frame = $self->stomp->connect(
538         -login    => 'testing',
539         -passcode => 'testing'
540    );
541
542    $poe_kernel->yield('send_data', $frame);
543
544=head1 SEE ALSO
545
546=over 4
547
548=item L<XAS|XAS>
549
550=back
551
552 For details on the protocol see L<http://stomp.github.io/>.
553
554=head1 AUTHOR
555
556Kevin L. Esteb, E<lt>=[@kesteb.usE<gt>
557
558=head1 COPYRIGHT AND LICENSE
559
560Copyright (C) 2014 Kevin L. Esteb
561
562This library is free software; you can redistribute it and/or modify
563it under the same terms as Perl itself, either Perl version 5.8.8 or,
564at your option, any later version of Perl 5 you may have available.
565
566See L<http://dev.perl.org/licenses/> for more information.
567
568=cut
Note: See TracBrowser for help on using the repository browser.