1: <?php
2:
3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13:
14:
15: namespace Avisota\Queue;
16:
17: use Avisota\Event\PostTransportMessageEvent;
18: use Avisota\Event\PreTransportMessageEvent;
19: use Avisota\Message\MessageInterface;
20: use Avisota\Message\NativeMessage;
21: use Avisota\Transport\TransportInterface;
22: use Avisota\Transport\TransportStatus;
23: use Doctrine\DBAL\Connection;
24: use Doctrine\DBAL\Schema\Schema;
25: use Doctrine\DBAL\Statement;
26: use Psr\Log\LoggerInterface;
27: use Symfony\Component\EventDispatcher\EventDispatcher;
28:
29: 30: 31: 32: 33:
34: class SimpleDatabaseQueue
35: implements MutableQueueInterface, EventEmittingQueueInterface, LoggingQueueInterface
36: {
37: static public function createTableSchema($tableName)
38: {
39: $schema = new Schema();
40: $table = $schema->createTable($tableName);
41: $table->addColumn('id', 'integer', array('unsigned' => true, 'autoincrement' => true));
42: $table->addColumn('enqueue', 'datetime');
43: $table->addColumn('message', 'text');
44: $table->addColumn('delivery_date', 'datetime', array('notnull' => false));
45: $table->setPrimaryKey(array('id'));
46: return $schema;
47: }
48:
49: 50: 51:
52: protected $connection;
53:
54: 55: 56:
57: protected $tableName;
58:
59: 60: 61:
62: protected $logger;
63:
64: 65: 66:
67: protected $eventDispatcher;
68:
69: 70: 71: 72: 73: 74:
75: public function __construct(
76: Connection $connection,
77: $tableName,
78: $createTableIfNotExists = false,
79: LoggerInterface $logger = null,
80: EventDispatcher $eventDispatcher = null
81: ) {
82: $this->connection = $connection;
83: $this->tableName = (string) $tableName;
84: $this->logger = $logger;
85: $this->eventDispatcher = $eventDispatcher;
86:
87: $schemaManager = $this->connection->getSchemaManager();
88: if (!$schemaManager->tablesExist($this->tableName)) {
89: if ($createTableIfNotExists) {
90: $schema = static::createTableSchema($this->tableName);
91: $queries = $schema->toSql($this->connection->getDatabasePlatform());
92: foreach ($queries as $query) {
93: $this->connection->exec($query);
94: }
95: }
96: else {
97: throw new \RuntimeException('The queue table ' . $this->tableName . ' does not exists!');
98: }
99: }
100: }
101:
102: 103: 104:
105: public function setEventDispatcher(EventDispatcher $eventDispatcher = null)
106: {
107: $this->eventDispatcher = $eventDispatcher;
108: return $this;
109: }
110:
111: 112: 113:
114: public function getEventDispatcher()
115: {
116: return $this->eventDispatcher;
117: }
118:
119: 120: 121:
122: public function setLogger(LoggerInterface $logger = null)
123: {
124: $this->logger = $logger;
125: return $this;
126: }
127:
128: 129: 130:
131: public function getLogger()
132: {
133: return $this->logger;
134: }
135:
136: 137: 138:
139: public function isEmpty()
140: {
141: return $this->length() == 0;
142: }
143:
144: 145: 146:
147: public function length()
148: {
149: $queryBuilder = $this->connection->createQueryBuilder();
150:
151: $statement = $queryBuilder
152: ->select('COUNT(q.id)')
153: ->from($this->tableName, 'q')
154: ->execute();
155:
156: return (int) $statement->fetchColumn();
157: }
158:
159: 160: 161:
162: public function getMessages()
163: {
164:
165: $statement = $this->connection
166: ->createQueryBuilder()
167: ->select('q.message')
168: ->from($this->tableName, 'q')
169: ->execute();
170:
171: $serializedMessages = $statement->fetchAll(\PDO::FETCH_COLUMN, 0);
172: $messages = array_map('unserialize', $serializedMessages);
173:
174: return $messages;
175: }
176:
177: 178: 179:
180: public function execute(TransportInterface $transport, ExecutionConfig $config = null)
181: {
182: $timeout = $config && $config->getTimeLimit() > 0
183: ? time() + $config->getTimeLimit()
184: : PHP_INT_MAX;
185:
186: $resultSet = $this->selectRecords($config);
187:
188: $results = array();
189:
190: if ($config) {
191: $decider = $config->getDecider();
192: }
193: else {
194: $decider = null;
195: }
196:
197: 198: 199:
200: $transport->initialise();
201:
202: $duration = 0;
203: while (count($resultSet) && (time() + $duration) < $timeout) {
204: $record = array_shift($resultSet);
205:
206: $duration = time();
207: $status = $this->transport($transport, $decider, $record);
208: $duration = time() - $duration;
209:
210: if ($status) {
211: $results[] = $status;
212: $this->connection->delete($this->tableName, array('id' => $record['id']));
213: }
214: }
215:
216: 217: 218:
219: $transport->flush();
220:
221: return $results;
222: }
223:
224: 225: 226:
227: protected function selectRecords(ExecutionConfig $config = null)
228: {
229: $queryBuilder = $this->connection->createQueryBuilder();
230: $queryBuilder
231: ->select('q.*')
232: ->from($this->tableName, 'q');
233:
234: if ($config) {
235: if ($config->getMessageLimit() > 0) {
236: $queryBuilder->setMaxResults($config->getMessageLimit());
237: }
238: }
239:
240:
241: $statement = $queryBuilder->execute();
242: $resultSet = $statement->fetchAll();
243:
244: return $resultSet;
245: }
246:
247: 248: 249: 250: 251:
252: protected function deserializeMessage(array $record)
253: {
254: try {
255: return unserialize($record['message']);
256: }
257: catch (\Exception $e) {
258:
259: if ($this->logger) {
260: $this->logger->error(
261: sprintf(
262: 'Could not deserialize message "%s": %s',
263: $record['message'],
264: $e->getMessage()
265: )
266: );
267: }
268:
269: $this->connection->delete($this->tableName, array('id' => $record['id']));
270:
271: return false;
272: }
273: }
274:
275: 276: 277: 278: 279: 280: 281: 282:
283: protected function transport(
284: TransportInterface $transport,
285: ExecutionDeciderInterface $decider = null,
286: $record
287: ) {
288: if ($record['delivery_date']) {
289: $deliveryDate = \DateTime::createFromFormat('Y-m-d H:i:s', $record['delivery_date']);
290: if ($deliveryDate->getTimestamp() > time()) {
291: return false;
292: }
293: }
294:
295: $message = $this->deserializeMessage($record);
296:
297:
298: if (!$message || $decider && !$decider->accept($message)) {
299: return false;
300: }
301:
302:
303: $this->logPreTransportStatus($transport, $message);
304:
305: try {
306:
307: $status = $transport->send($message);
308:
309:
310: $this->logSuccessfulStatus($transport, $message, $status);
311: }
312: catch (\Exception $e) {
313: $status = new TransportStatus(
314: $message,
315: 0,
316: array_merge(
317: $message->getRecipients(),
318: $message->getCopyRecipients(),
319: $message->getBlindCopyRecipients()
320: ),
321: $e
322: );
323:
324:
325: $this->logFailedStatus($transport, $message, $status);
326: }
327:
328: return $status;
329: }
330:
331: protected function prepareRecipientsForLogging(array $recipients)
332: {
333: $recipientNames = array();
334: foreach ($recipients as $email => $name) {
335: if (is_numeric($email)) {
336: $recipientNames[] = $name;
337: }
338: else {
339: $recipientNames[] = $email;
340: }
341: }
342: return implode(', ', $recipientNames);
343: }
344:
345: protected function logPreTransportStatus(TransportInterface $transport, MessageInterface $message)
346: {
347: if ($this->logger) {
348: $recipients = $this->prepareRecipientsForLogging($message->getRecipients());
349: $this->logger->error(
350: sprintf(
351: 'Begin transport of message "%s" to %s via transport "%s"',
352: $message->getSubject(),
353: $recipients,
354: get_class($transport)
355: ),
356: array(
357: 'message' => $message,
358: )
359: );
360: }
361:
362: if ($this->eventDispatcher) {
363: $this->eventDispatcher->dispatch(
364: 'avisota_queue_execution_pre_transport',
365: new PreTransportMessageEvent($message, $this)
366: );
367: }
368: }
369:
370: protected function logSuccessfulStatus(
371: TransportInterface $transport,
372: MessageInterface $message,
373: TransportStatus $status
374: ) {
375: if ($this->logger) {
376: $recipients = $this->prepareRecipientsForLogging($message->getRecipients());
377: if ($status->getSuccessfullySend() > 0 && count($status->getFailedRecipients()) > 0) {
378: $failedRecipients = $this->prepareRecipientsForLogging($status->getFailedRecipients());
379: $this->logger->warning(
380: sprintf(
381: 'Transport message "%s" to %s via transport "%s" partial succeeded, failed for: %s with %s',
382: $message->getSubject(),
383: $recipients,
384: get_class($transport),
385: $failedRecipients,
386: $status->getException() ? $status
387: ->getException()
388: ->getMessage() : 'no message'
389: ),
390: array(
391: 'message' => $message,
392: 'status' => $status,
393: 'exception' => $status->getException(),
394: )
395: );
396: }
397: else if (count($status->getFailedRecipients()) > 0) {
398: $failedRecipients = $this->prepareRecipientsForLogging($status->getFailedRecipients());
399: $this->logger->error(
400: sprintf(
401: 'Transport message "%s" to %s via transport "%s" failed for: %s with %s',
402: $message->getSubject(),
403: $recipients,
404: get_class($transport),
405: $failedRecipients,
406: $status->getException() ? $status
407: ->getException()
408: ->getMessage() : 'no message'
409: ),
410: array(
411: 'message' => $message,
412: 'status' => $status,
413: 'exception' => $status->getException(),
414: )
415: );
416: }
417: else {
418: $this->logger->debug(
419: sprintf(
420: 'Transport message "%s" to %s via transport "%s" succeed',
421: $message->getSubject(),
422: $recipients,
423: get_class($transport)
424: ),
425: array(
426: 'message' => $message,
427: 'status' => $status,
428: )
429: );
430: }
431: }
432:
433: if ($this->eventDispatcher) {
434: $this->eventDispatcher->dispatch(
435: 'avisota_queue_execution_post_transport',
436: new PostTransportMessageEvent($message, $this, $status)
437: );
438: }
439: }
440:
441: protected function logFailedStatus(
442: TransportInterface $transport,
443: MessageInterface $message,
444: TransportStatus $status
445: ) {
446: if ($this->logger) {
447: $recipients = $this->prepareRecipientsForLogging($message->getRecipients());
448: $this->logger->error(
449: sprintf(
450: 'Could not transport message "%s" to %s via transport "%s": %s',
451: $message->getSubject(),
452: $recipients,
453: get_class($transport),
454: $status
455: ->getException()
456: ->getMessage()
457: ),
458: array(
459: 'message' => $message,
460: )
461: );
462: }
463:
464: if ($this->eventDispatcher) {
465: $this->eventDispatcher->dispatch(
466: 'avisota_queue_execution_post_transport',
467: new PostTransportMessageEvent($message, $this, $status)
468: );
469: }
470: }
471:
472: 473: 474:
475: public function enqueue(MessageInterface $message, \DateTime $deliveryDate = null)
476: {
477: $affectedRows = $this->connection->insert(
478: $this->tableName,
479: array(
480: 'enqueue' => date('Y-m-d H:i:s'),
481: 'message' => serialize($message),
482: 'delivery_date' => $deliveryDate
483: )
484: );
485:
486: return (bool) $affectedRows;
487: }
488:
489: 490: 491:
492: public function dequeue(MessageInterface $message)
493: {
494: $affectedRows = $this->connection->delete(
495: $this->tableName,
496: array(
497: 'message' => serialize($message),
498: )
499: );
500:
501: return (bool) $affectedRows;
502: }
503: }