Overview

Namespaces

  • Avisota
    • Event
    • Message
    • Queue
    • Recipient
    • RecipientSource
    • Renderer
    • Templating
    • Test
      • Database
      • Imap
      • Message
      • Queue
      • Renderer
      • Transport
    • Transport

Classes

  • ExecutionConfig
  • QueueHelper
  • SimpleDatabaseQueue

Interfaces

  • ArchivingQueueInterface
  • EventEmittingQueueInterface
  • ExecutionDeciderInterface
  • LoggingQueueInterface
  • MutableQueueInterface
  • QueueInterface
  • Overview
  • Namespace
  • Class
  • Tree
  • Deprecated
  • Todo
  • Download
  1: <?php
  2: 
  3: /**
  4:  * Avisota newsletter and mailing system
  5:  *
  6:  * PHP Version 5.3
  7:  *
  8:  * @copyright  bit3 UG 2013
  9:  * @author     Tristan Lins <tristan.lins@bit3.de>
 10:  * @package    avisota-core
 11:  * @license    LGPL-3.0+
 12:  * @link       http://avisota.org
 13:  */
 14: 
 15: namespace Avisota\Queue;
 16: 
 17: use Avisota\Event\PostTransportMessageEvent;
 18: use Avisota\Event\PreTransportMessageEvent;
 19: use Avisota\Message\MessageInterface;
 20: use Avisota\Message\NativeMessage;
 21: use Avisota\Transport\TransportInterface;
 22: use Avisota\Transport\TransportStatus;
 23: use Doctrine\DBAL\Connection;
 24: use Doctrine\DBAL\Schema\Schema;
 25: use Doctrine\DBAL\Statement;
 26: use Psr\Log\LoggerInterface;
 27: use Symfony\Component\EventDispatcher\EventDispatcher;
 28: 
 29: /**
 30:  * A simple single threaded queue storing the messages in a small database table.
 31:  *
 32:  * @package Avisota\Queue
 33:  */
 34: class SimpleDatabaseQueue
 35:     implements MutableQueueInterface, EventEmittingQueueInterface, LoggingQueueInterface
 36: {
 37:     static public function createTableSchema($tableName)
 38:     {
 39:         $schema = new Schema();
 40:         $table  = $schema->createTable($tableName);
 41:         $table->addColumn('id', 'integer', array('unsigned' => true, 'autoincrement' => true));
 42:         $table->addColumn('enqueue', 'datetime');
 43:         $table->addColumn('message', 'text');
 44:         $table->addColumn('delivery_date', 'datetime', array('notnull' => false));
 45:         $table->setPrimaryKey(array('id'));
 46:         return $schema;
 47:     }
 48: 
 49:     /**
 50:      * @var Connection
 51:      */
 52:     protected $connection;
 53: 
 54:     /**
 55:      * @var string
 56:      */
 57:     protected $tableName;
 58: 
 59:     /**
 60:      * @var LoggerInterface
 61:      */
 62:     protected $logger;
 63: 
 64:     /**
 65:      * @var EventDispatcher
 66:      */
 67:     protected $eventDispatcher;
 68: 
 69:     /**
 70:      * @param NativeMessage $messageSerializer      The message serializer.
 71:      * @param Connection    $connection             The database connection.
 72:      * @param string        $tableName              The name of the database table.
 73:      * @param bool          $createTableIfNotExists Create the table if not exists.
 74:      */
 75:     public function __construct(
 76:         Connection $connection,
 77:         $tableName,
 78:         $createTableIfNotExists = false,
 79:         LoggerInterface $logger = null,
 80:         EventDispatcher $eventDispatcher = null
 81:     ) {
 82:         $this->connection      = $connection;
 83:         $this->tableName       = (string) $tableName;
 84:         $this->logger          = $logger;
 85:         $this->eventDispatcher = $eventDispatcher;
 86: 
 87:         $schemaManager = $this->connection->getSchemaManager();
 88:         if (!$schemaManager->tablesExist($this->tableName)) {
 89:             if ($createTableIfNotExists) {
 90:                 $schema  = static::createTableSchema($this->tableName);
 91:                 $queries = $schema->toSql($this->connection->getDatabasePlatform());
 92:                 foreach ($queries as $query) {
 93:                     $this->connection->exec($query);
 94:                 }
 95:             }
 96:             else {
 97:                 throw new \RuntimeException('The queue table ' . $this->tableName . ' does not exists!');
 98:             }
 99:         }
100:     }
101: 
102:     /**
103:      * {@inheritdoc}
104:      */
105:     public function setEventDispatcher(EventDispatcher $eventDispatcher = null)
106:     {
107:         $this->eventDispatcher = $eventDispatcher;
108:         return $this;
109:     }
110: 
111:     /**
112:      * {@inheritdoc}
113:      */
114:     public function getEventDispatcher()
115:     {
116:         return $this->eventDispatcher;
117:     }
118: 
119:     /**
120:      * {@inheritdoc}
121:      */
122:     public function setLogger(LoggerInterface $logger = null)
123:     {
124:         $this->logger = $logger;
125:         return $this;
126:     }
127: 
128:     /**
129:      * {@inheritdoc}
130:      */
131:     public function getLogger()
132:     {
133:         return $this->logger;
134:     }
135: 
136:     /**
137:      * {@inheritdoc}
138:      */
139:     public function isEmpty()
140:     {
141:         return $this->length() == 0;
142:     }
143: 
144:     /**
145:      * {@inheritdoc}
146:      */
147:     public function length()
148:     {
149:         $queryBuilder = $this->connection->createQueryBuilder();
150:         /** @var Statement $statement */
151:         $statement = $queryBuilder
152:             ->select('COUNT(q.id)')
153:             ->from($this->tableName, 'q')
154:             ->execute();
155: 
156:         return (int) $statement->fetchColumn();
157:     }
158: 
159:     /**
160:      * {@inheritdoc}
161:      */
162:     public function getMessages()
163:     {
164:         /** @var Statement $statement */
165:         $statement = $this->connection
166:             ->createQueryBuilder()
167:             ->select('q.message')
168:             ->from($this->tableName, 'q')
169:             ->execute();
170: 
171:         $serializedMessages = $statement->fetchAll(\PDO::FETCH_COLUMN, 0);
172:         $messages           = array_map('unserialize', $serializedMessages);
173: 
174:         return $messages;
175:     }
176: 
177:     /**
178:      * {@inheritdoc}
179:      */
180:     public function execute(TransportInterface $transport, ExecutionConfig $config = null)
181:     {
182:         $timeout = $config && $config->getTimeLimit() > 0
183:             ? time() + $config->getTimeLimit()
184:             : PHP_INT_MAX;
185: 
186:         $resultSet = $this->selectRecords($config);
187: 
188:         $results = array();
189: 
190:         if ($config) {
191:             $decider = $config->getDecider();
192:         }
193:         else {
194:             $decider = null;
195:         }
196: 
197:         /**
198:          * Initialise transport system
199:          */
200:         $transport->initialise();
201: 
202:         $duration = 0;
203:         while (count($resultSet) && (time() + $duration) < $timeout) {
204:             $record = array_shift($resultSet);
205: 
206:             $duration = time();
207:             $status = $this->transport($transport, $decider, $record);
208:             $duration = time() - $duration;
209: 
210:             if ($status) {
211:                 $results[] = $status;
212:                 $this->connection->delete($this->tableName, array('id' => $record['id']));
213:             }
214:         }
215: 
216:         /**
217:          * Flush transport
218:          */
219:         $transport->flush();
220: 
221:         return $results;
222:     }
223: 
224:     /**
225:      * @return array[]
226:      */
227:     protected function selectRecords(ExecutionConfig $config = null)
228:     {
229:         $queryBuilder = $this->connection->createQueryBuilder();
230:         $queryBuilder
231:             ->select('q.*')
232:             ->from($this->tableName, 'q');
233: 
234:         if ($config) {
235:             if ($config->getMessageLimit() > 0) {
236:                 $queryBuilder->setMaxResults($config->getMessageLimit());
237:             }
238:         }
239: 
240:         /** @var Statement $statement */
241:         $statement = $queryBuilder->execute();
242:         $resultSet = $statement->fetchAll();
243: 
244:         return $resultSet;
245:     }
246: 
247:     /**
248:      * @param array $record
249:      *
250:      * @return MessageInterface
251:      */
252:     protected function deserializeMessage(array $record)
253:     {
254:         try {
255:             return unserialize($record['message']);
256:         }
257:         catch (\Exception $e) {
258:             // log failed transport
259:             if ($this->logger) {
260:                 $this->logger->error(
261:                     sprintf(
262:                         'Could not deserialize message "%s": %s',
263:                         $record['message'],
264:                         $e->getMessage()
265:                     )
266:                 );
267:             }
268: 
269:             $this->connection->delete($this->tableName, array('id' => $record['id']));
270: 
271:             return false;
272:         }
273:     }
274: 
275:     /**
276:      * Do the transport of the message and create a status information object.
277:      *
278:      * @param TransportInterface $transport
279:      * @param MessageInterface   $message
280:      *
281:      * @return TransportStatus
282:      */
283:     protected function transport(
284:         TransportInterface $transport,
285:         ExecutionDeciderInterface $decider = null,
286:         $record
287:     ) {
288:         if ($record['delivery_date']) {
289:             $deliveryDate = \DateTime::createFromFormat('Y-m-d H:i:s', $record['delivery_date']);
290:             if ($deliveryDate->getTimestamp() > time()) {
291:                 return false;
292:             }
293:         }
294: 
295:         $message = $this->deserializeMessage($record);
296: 
297:         // skip message
298:         if (!$message || $decider && !$decider->accept($message)) {
299:             return false;
300:         }
301: 
302:         // log pre transport
303:         $this->logPreTransportStatus($transport, $message);
304: 
305:         try {
306:             // try to transport message
307:             $status = $transport->send($message);
308: 
309:             // log successful transport
310:             $this->logSuccessfulStatus($transport, $message, $status);
311:         }
312:         catch (\Exception $e) {
313:             $status = new TransportStatus(
314:                 $message,
315:                 0,
316:                 array_merge(
317:                     $message->getRecipients(),
318:                     $message->getCopyRecipients(),
319:                     $message->getBlindCopyRecipients()
320:                 ),
321:                 $e
322:             );
323: 
324:             // log failed transport
325:             $this->logFailedStatus($transport, $message, $status);
326:         }
327: 
328:         return $status;
329:     }
330: 
331:     protected function prepareRecipientsForLogging(array $recipients)
332:     {
333:         $recipientNames = array();
334:         foreach ($recipients as $email => $name) {
335:             if (is_numeric($email)) {
336:                 $recipientNames[] = $name;
337:             }
338:             else {
339:                 $recipientNames[] = $email;
340:             }
341:         }
342:         return implode(', ', $recipientNames);
343:     }
344: 
345:     protected function logPreTransportStatus(TransportInterface $transport, MessageInterface $message)
346:     {
347:         if ($this->logger) {
348:             $recipients = $this->prepareRecipientsForLogging($message->getRecipients());
349:             $this->logger->error(
350:                 sprintf(
351:                     'Begin transport of message "%s" to %s via transport "%s"',
352:                     $message->getSubject(),
353:                     $recipients,
354:                     get_class($transport)
355:                 ),
356:                 array(
357:                     'message' => $message,
358:                 )
359:             );
360:         }
361: 
362:         if ($this->eventDispatcher) {
363:             $this->eventDispatcher->dispatch(
364:                 'avisota_queue_execution_pre_transport',
365:                 new PreTransportMessageEvent($message, $this)
366:             );
367:         }
368:     }
369: 
370:     protected function logSuccessfulStatus(
371:         TransportInterface $transport,
372:         MessageInterface $message,
373:         TransportStatus $status
374:     ) {
375:         if ($this->logger) {
376:             $recipients = $this->prepareRecipientsForLogging($message->getRecipients());
377:             if ($status->getSuccessfullySend() > 0 && count($status->getFailedRecipients()) > 0) {
378:                 $failedRecipients = $this->prepareRecipientsForLogging($status->getFailedRecipients());
379:                 $this->logger->warning(
380:                     sprintf(
381:                         'Transport message "%s" to %s via transport "%s" partial succeeded, failed for: %s with %s',
382:                         $message->getSubject(),
383:                         $recipients,
384:                         get_class($transport),
385:                         $failedRecipients,
386:                         $status->getException() ? $status
387:                             ->getException()
388:                             ->getMessage() : 'no message'
389:                     ),
390:                     array(
391:                         'message'   => $message,
392:                         'status'    => $status,
393:                         'exception' => $status->getException(),
394:                     )
395:                 );
396:             }
397:             else if (count($status->getFailedRecipients()) > 0) {
398:                 $failedRecipients = $this->prepareRecipientsForLogging($status->getFailedRecipients());
399:                 $this->logger->error(
400:                     sprintf(
401:                         'Transport message "%s" to %s via transport "%s" failed for: %s with %s',
402:                         $message->getSubject(),
403:                         $recipients,
404:                         get_class($transport),
405:                         $failedRecipients,
406:                         $status->getException() ? $status
407:                             ->getException()
408:                             ->getMessage() : 'no message'
409:                     ),
410:                     array(
411:                         'message'   => $message,
412:                         'status'    => $status,
413:                         'exception' => $status->getException(),
414:                     )
415:                 );
416:             }
417:             else {
418:                 $this->logger->debug(
419:                     sprintf(
420:                         'Transport message "%s" to %s via transport "%s" succeed',
421:                         $message->getSubject(),
422:                         $recipients,
423:                         get_class($transport)
424:                     ),
425:                     array(
426:                         'message' => $message,
427:                         'status'  => $status,
428:                     )
429:                 );
430:             }
431:         }
432: 
433:         if ($this->eventDispatcher) {
434:             $this->eventDispatcher->dispatch(
435:                 'avisota_queue_execution_post_transport',
436:                 new PostTransportMessageEvent($message, $this, $status)
437:             );
438:         }
439:     }
440: 
441:     protected function logFailedStatus(
442:         TransportInterface $transport,
443:         MessageInterface $message,
444:         TransportStatus $status
445:     ) {
446:         if ($this->logger) {
447:             $recipients = $this->prepareRecipientsForLogging($message->getRecipients());
448:             $this->logger->error(
449:                 sprintf(
450:                     'Could not transport message "%s" to %s via transport "%s": %s',
451:                     $message->getSubject(),
452:                     $recipients,
453:                     get_class($transport),
454:                     $status
455:                         ->getException()
456:                         ->getMessage()
457:                 ),
458:                 array(
459:                     'message' => $message,
460:                 )
461:             );
462:         }
463: 
464:         if ($this->eventDispatcher) {
465:             $this->eventDispatcher->dispatch(
466:                 'avisota_queue_execution_post_transport',
467:                 new PostTransportMessageEvent($message, $this, $status)
468:             );
469:         }
470:     }
471: 
472:     /**
473:      * {@inheritdoc}
474:      */
475:     public function enqueue(MessageInterface $message, \DateTime $deliveryDate = null)
476:     {
477:         $affectedRows = $this->connection->insert(
478:             $this->tableName,
479:             array(
480:                 'enqueue'       => date('Y-m-d H:i:s'),
481:                 'message'       => serialize($message),
482:                 'delivery_date' => $deliveryDate
483:             )
484:         );
485: 
486:         return (bool) $affectedRows;
487:     }
488: 
489:     /**
490:      * {@inheritdoc}
491:      */
492:     public function dequeue(MessageInterface $message)
493:     {
494:         $affectedRows = $this->connection->delete(
495:             $this->tableName,
496:             array(
497:                 'message' => serialize($message),
498:             )
499:         );
500: 
501:         return (bool) $affectedRows;
502:     }
503: }
avisota/core API documentation generated by ApiGen 2.8.0