parent
							
								
									8771e3c323
								
							
						
					
					
						commit
						880820af64
					
				@ -0,0 +1,93 @@ | 
				
			||||
package Lemonldap::NG::Common::MessageBroker::DBI; | 
				
			||||
 | 
				
			||||
use strict; | 
				
			||||
use DBI; | 
				
			||||
use JSON; | 
				
			||||
use String::Random qw/random_string/; | 
				
			||||
 | 
				
			||||
our $VERSION = '2.20.0'; | 
				
			||||
 | 
				
			||||
sub new { | 
				
			||||
    my ( $class, $conf, $logger ) = @_; | 
				
			||||
    my $args = $conf->{messageBrokerOptions}; | 
				
			||||
    unless ( $args and $args->{dbiChain} ) { | 
				
			||||
        $logger->error('MISSING OPTIONS FOR DBI PUB/SUB'); | 
				
			||||
        return undef; | 
				
			||||
    } | 
				
			||||
    my $self = bless { %{$args}, logger => $logger }, $class; | 
				
			||||
    return $self; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
sub publish { | 
				
			||||
    my ( $self, $channel, $msg ) = @_; | 
				
			||||
    die 'Not a hash msg' unless ref $msg eq 'HASH'; | 
				
			||||
    my $j   = eval { JSON::to_json($msg) }; | 
				
			||||
    my $now = time; | 
				
			||||
    my $old = time - 600; | 
				
			||||
    my @devices = | 
				
			||||
      $self->_dbh->selectrow_array( 'SELECT name FROM devices WHERE channel=?', | 
				
			||||
        undef, $channel ); | 
				
			||||
    foreach my $device (@devices) { | 
				
			||||
        $self->_dbh->do( | 
				
			||||
'INSERT INTO deviceinbox (device, time, channel, message) VALUES (?,?,?,?)', | 
				
			||||
            undef, $device, $now, $channel, $j | 
				
			||||
        ); | 
				
			||||
    } | 
				
			||||
    $self->_dbh->do("DELETE FROM deviceinbox WHERE time < $old"); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
sub subscribe { | 
				
			||||
    my ( $self, $channel ) = @_; | 
				
			||||
    $self->{messages}{$channel} = []; | 
				
			||||
    $self->{_name} ||= random_string( 's' x 30 ); | 
				
			||||
    $self->_dbh->do( | 
				
			||||
        'INSERT INTO devices (name, lastseen, channel) VALUES(?,?,?)', | 
				
			||||
        undef, $self->{_name}, time, $channel ); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
sub getNextMessage { | 
				
			||||
    my ( $self, $channel, $delay ) = @_; | 
				
			||||
    return undef unless $self->{messages}{$channel}; | 
				
			||||
    return shift @{ $self->{messages}{$channel} } | 
				
			||||
      if ( @{ $self->{messages}{$channel} } ); | 
				
			||||
    $self->_dbh->do( 'UPDATE devices SET lastseen=? WHERE name=?', | 
				
			||||
        undef, time, $self->{_name} ); | 
				
			||||
    my @msg = $self->_dbh->selectrow_array( | 
				
			||||
        'SELECT message FROM deviceinbox WHERE device=? AND channel=?', | 
				
			||||
        undef, $self->{_name}, $channel ); | 
				
			||||
    return unless @msg or @{ $self->{messages}{$channel} }; | 
				
			||||
    $self->_dbh->do( 'DELETE FROM deviceinbox WHERE device=? AND channel=?', | 
				
			||||
        undef, $self->{_name}, $channel ); | 
				
			||||
    foreach my $msg (@msg) { | 
				
			||||
        my $tmp = eval { JSON::from_json($msg) }; | 
				
			||||
        if ($@) { | 
				
			||||
            $self->{logger}->error("Bad message from DB: $@"); | 
				
			||||
        } | 
				
			||||
        else { | 
				
			||||
            push @{ $self->{messages}{$channel} }, $tmp; | 
				
			||||
        } | 
				
			||||
        $self->_dbh->do( 'DELETE FROM deviceinbox WHERE device=? AND channel=?', | 
				
			||||
            undef, $self->{_name}, $channel ); | 
				
			||||
    } | 
				
			||||
    return shift @{ $self->{messages}{$channel} }; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
sub waitForNextMessage { | 
				
			||||
    my ( $self, $channel ) = @_; | 
				
			||||
    return undef unless $self->{messages}{$channel}; | 
				
			||||
 | 
				
			||||
    # Infinite loop until one message is seen | 
				
			||||
    my $res; | 
				
			||||
    while ( not( $res = $self->getNextMessage($channel) ) ) { | 
				
			||||
        sleep 1; | 
				
			||||
    } | 
				
			||||
} | 
				
			||||
 | 
				
			||||
sub _dbh { | 
				
			||||
    my ($self) = @_; | 
				
			||||
    return $self->{_dbh} if ( $self->{_dbh} and $self->{_dbh}->ping ); | 
				
			||||
    $self->{_dbh} = DBI->connect_cached( $self->{dbiChain}, $self->{dbiUser}, | 
				
			||||
        $self->{dbiPassword}, { RaiseError => 1, AutoCommit => 1, } ); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
1; | 
				
			||||
@ -0,0 +1,118 @@ | 
				
			||||
use strict; | 
				
			||||
use Test::More; | 
				
			||||
use IO::String; | 
				
			||||
use Time::Fake; | 
				
			||||
 | 
				
			||||
our $noSqlite; | 
				
			||||
 | 
				
			||||
BEGIN { | 
				
			||||
    require 't/test-lib.pm'; | 
				
			||||
    eval { | 
				
			||||
        require DBI; | 
				
			||||
        require DBD::SQLite; | 
				
			||||
        require Lemonldap::NG::Common::MessageBroker::DBI; | 
				
			||||
    }; | 
				
			||||
    if ($@) { | 
				
			||||
        print STDERR $@; | 
				
			||||
        $noSqlite = $@; | 
				
			||||
    } | 
				
			||||
    else { | 
				
			||||
        use_ok('Lemonldap::NG::Common::MessageBroker::DBI'); | 
				
			||||
    } | 
				
			||||
} | 
				
			||||
 | 
				
			||||
SKIP: { | 
				
			||||
    skip( "One dep is missing: $noSqlite", 1 ) if $noSqlite; | 
				
			||||
    my $userdb = tempdb(); | 
				
			||||
    my $dbh    = DBI->connect("dbi:SQLite:dbname=$userdb"); | 
				
			||||
    $dbh->do( | 
				
			||||
'CREATE TABLE devices (name VARCHAR(1024), lastseen INT, channel VARCHAR(255))' | 
				
			||||
    ); | 
				
			||||
    $dbh->do( | 
				
			||||
'CREATE TABLE deviceinbox (device VARCHAR(255), time INT, channel VARCHAR(255), message VARCHAR(4096))' | 
				
			||||
    ); | 
				
			||||
    my $client = LLNG::Manager::Test->new( { | 
				
			||||
            ini => { | 
				
			||||
                messageBroker        => '::DBI', | 
				
			||||
                messageBrokerOptions => { | 
				
			||||
                    dbiChain => "dbi:SQLite:dbname=$userdb", | 
				
			||||
                }, | 
				
			||||
            } | 
				
			||||
        } | 
				
			||||
    ); | 
				
			||||
    my $pub = Lemonldap::NG::Common::MessageBroker::DBI->new( { | 
				
			||||
            messageBrokerOptions => { | 
				
			||||
                dbiChain => "dbi:SQLite:dbname=$userdb", | 
				
			||||
            }, | 
				
			||||
        } | 
				
			||||
    ); | 
				
			||||
 | 
				
			||||
    # Simple test to verify that unlog is well propagated inside portal | 
				
			||||
    subtest "Simple login/logout" => sub { | 
				
			||||
        my $id = $client->login('dwho'); | 
				
			||||
        $client->logout($id); | 
				
			||||
    }; | 
				
			||||
 | 
				
			||||
    # Test to verify that cache is cleaned when an unlog event is read | 
				
			||||
    # from pub/sub | 
				
			||||
    subtest "External logout" => sub { | 
				
			||||
        my $id = $client->login('dwho'); | 
				
			||||
        ok( | 
				
			||||
            unlink( | 
				
			||||
                $client->ini->{globalStorageOptions}->{Directory} . "/$id" | 
				
			||||
            ), | 
				
			||||
            'Delete session from global storage' | 
				
			||||
        ); | 
				
			||||
        my $sd = $client->ini->{globalStorageOptions}->{Directory}; | 
				
			||||
        note "Push unlog event"; | 
				
			||||
        $pub->publish( 'llng_events', { action => 'unlog', id => $id } ); | 
				
			||||
        Time::Fake->offset('+6s'); | 
				
			||||
        my $res; | 
				
			||||
        ok( $res = $client->_get( '/', cookie => "lemonldap=$id" ), | 
				
			||||
            'Try / after 6 seconds' ); | 
				
			||||
        expectReject($res); | 
				
			||||
    }; | 
				
			||||
 | 
				
			||||
    # Test to verify that: | 
				
			||||
    #  - unlog event only unlog the good id | 
				
			||||
    #  - session destroyed without event still exist in cache | 
				
			||||
    subtest "External logout with 2 ids" => sub { | 
				
			||||
        my $id  = $client->login('dwho'); | 
				
			||||
        my $id2 = $client->login('french'); | 
				
			||||
        ok( | 
				
			||||
            unlink( | 
				
			||||
                $client->ini->{globalStorageOptions}->{Directory} . "/$id" | 
				
			||||
            ), | 
				
			||||
            'Delete session from global storage' | 
				
			||||
        ); | 
				
			||||
        my $sd = $client->ini->{globalStorageOptions}->{Directory}; | 
				
			||||
        note "Push unlog event"; | 
				
			||||
        $pub->publish( 'llng_events', { action => 'unlog', id => $id } ); | 
				
			||||
 | 
				
			||||
        # 6+6 because time already updated to +6 | 
				
			||||
        Time::Fake->offset('+12s'); | 
				
			||||
        my $res; | 
				
			||||
        ok( $res = $client->_get( '/', cookie => "lemonldap=$id" ), | 
				
			||||
            'Try / after 6 seconds' ); | 
				
			||||
        expectReject($res); | 
				
			||||
        ok( | 
				
			||||
            unlink( | 
				
			||||
                $client->ini->{globalStorageOptions}->{Directory} . "/$id2" | 
				
			||||
            ), | 
				
			||||
            'Delete session from global storage' | 
				
			||||
        ); | 
				
			||||
        Time::Fake->offset('+18s'); | 
				
			||||
      TODO: { | 
				
			||||
            local $TODO = 'Cache may fail on CI'; | 
				
			||||
            ok( | 
				
			||||
                $res = $client->_get( '/', cookie => "lemonldap=$id2" ), | 
				
			||||
                'Try with unlogged user without event (still in cache)' | 
				
			||||
            ); | 
				
			||||
            expectOK($res); | 
				
			||||
        } | 
				
			||||
    }; | 
				
			||||
    unlink $userdb; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
clean_sessions(); | 
				
			||||
done_testing(); | 
				
			||||
					Loading…
					
					
				
		Reference in new issue