Overview

Namespaces

  • Copro
    • Adapters
  • Donut
    • Adapters
    • AtomFeed
    • Facebook
    • Instagram
    • PostFeed
    • Twitter

Classes

  • Copro\Adapters\DibiMysqlAdapter
  • Donut\Adapters\DibiMysqlAdapter
  • Donut\Adapters\DibiSqliteAdapter
  • Donut\Adapters\MemoryAdapter
  • Donut\AtomFeed\AtomFeedItem
  • Donut\AtomFeed\ConvertItemToFacebookPost
  • Donut\AtomFeed\ConvertItemToInstagramPost
  • Donut\AtomFeed\ConvertItemToTweet
  • Donut\AtomFeed\FetchNewItems
  • Donut\DefaultCurrentTimeFactory
  • Donut\Facebook\FacebookApi
  • Donut\Facebook\FacebookPost
  • Donut\Facebook\PublishFacebookPost
  • Donut\Helpers
  • Donut\Instagram\InstagramApi
  • Donut\Instagram\InstagramPost
  • Donut\Instagram\PublishInstagramPost
  • Donut\Manager
  • Donut\Message
  • Donut\Period
  • Donut\PostFeed\ConvertItemToFacebookPost
  • Donut\PostFeed\ConvertItemToInstagramPost
  • Donut\PostFeed\ConvertItemToTweet
  • Donut\PostFeed\FetchNewItems
  • Donut\PostFeed\PostFeedItem
  • Donut\Processor
  • Donut\ProducerInfo
  • Donut\Queue
  • Donut\Time
  • Donut\Twitter\PublishTweet
  • Donut\Twitter\Tweet
  • Donut\Twitter\TwitterApi

Interfaces

  • Donut\IAdapter
  • Donut\ICurrentTimeFactory
  • Donut\IProducer
  • Donut\IWorker

Exceptions

  • Donut\Exception
  • Donut\InvalidArgumentException
  • Donut\InvalidStateException
  • Overview
  • Namespace
  • Class
  1:   2:   3:   4:   5:   6:   7:   8:   9:  10:  11:  12:  13:  14:  15:  16:  17:  18:  19:  20:  21:  22:  23:  24:  25:  26:  27:  28:  29:  30:  31:  32:  33:  34:  35:  36:  37:  38:  39:  40:  41:  42:  43:  44:  45:  46:  47:  48:  49:  50:  51:  52:  53:  54:  55:  56:  57:  58:  59:  60:  61:  62:  63:  64:  65:  66:  67:  68:  69:  70:  71:  72:  73:  74:  75:  76:  77:  78:  79:  80:  81:  82:  83:  84:  85:  86:  87:  88:  89:  90:  91:  92:  93:  94:  95:  96:  97:  98:  99: 100: 101: 102: 103: 104: 105: 106: 107: 108: 109: 110: 111: 112: 113: 114: 115: 116: 117: 118: 119: 120: 121: 122: 123: 124: 125: 126: 127: 128: 129: 130: 131: 132: 133: 134: 135: 136: 137: 138: 139: 140: 141: 142: 143: 144: 145: 146: 147: 148: 149: 150: 151: 152: 153: 154: 155: 156: 157: 158: 159: 160: 161: 162: 163: 164: 165: 166: 167: 168: 169: 170: 171: 172: 173: 174: 175: 176: 177: 178: 179: 180: 181: 182: 183: 184: 185: 186: 187: 188: 189: 190: 191: 192: 193: 194: 195: 196: 197: 198: 199: 200: 201: 202: 203: 204: 205: 206: 207: 208: 209: 210: 211: 212: 213: 214: 215: 
<?php

    namespace Donut;

    use CzProject\Logger;


    class Processor
    {
        /** @var Logger\ILogger */
        private $logger;

        /** @var IAdapter */
        private $adapter;

        /** @var ICurrentTimeFactory */
        private $currentTimeFactory;

        /** @var Manager */
        private $manager;

        /** @var callback|NULL */
        private $noMessageHandler;

        /** @var ProducerInfo[] */
        private $producers = array();

        /** @var array  [queue => IWorker[]] */
        private $workers;


        /**
         * @param  IAdapter
         * @param  callback|NULL
         */
        public function __construct(IAdapter $adapter, $noMessageHandler = NULL, Logger\ILogger $logger = NULL)
        {
            $this->logger = $logger ? $logger : new Logger\OutputLogger(Logger\ILogger::INFO);
            $this->adapter = $adapter;
            $this->currentTimeFactory = new DefaultCurrentTimeFactory;
            $this->manager = new Manager($adapter, $this->currentTimeFactory, $this->logger);
            $this->noMessageHandler = $noMessageHandler;
        }


        /**
         * @return static
         */
        public function addProducer(IProducer $producer, Period $period = NULL)
        {
            $this->producers[] = new ProducerInfo($producer, $period);
            return $this;
        }


        /**
         * @return static
         */
        public function addWorker($queue, IWorker $worker)
        {
            $this->workers[$queue][] = $worker;
            return $this;
        }


        /**
         * @return void
         */
        public function run($cycles = 1)
        {
            while ($cycles) {
                $this->processProducers();
                $this->processMessage();

                if (is_int($cycles)) {
                    $cycles--;
                }
            }
        }


        private function log($subject, $text = NULL, Message $message = NULL)
        {
            $this->adapter->log($subject, $text, $message, $this->currentTimeFactory->createTime());
        }


        private function processProducers()
        {
            $producer = $this->getProducerToRun();

            if ($producer) {
                try {
                    $this->logger->log('Run producer ' . $producer->getUniqueId());
                    $producer->run($this->manager, $this->adapter->getProducerLastRun($producer));
                    $this->adapter->saveProducerLastRun($producer, $this->currentTimeFactory->createTime());
                    $this->logger->log('Producer done.');

                } catch (\Exception $e) {
                    $this->logger->log('Producer failed.');
                    $msg = array(
                        $e->getMessage(),
                        'in file ' . $e->getFile(),
                        'on line' . $e->getLine(),
                        '-----------------------------',
                        $e->getTraceAsString(),
                    );
                    $this->log('Producer ' . get_class($producer) . ' failed', implode("\n", $msg));
                }
            }
        }


        /**
         * @return IProducer|NULL
         */
        private function getProducerToRun()
        {
            $currentTime = $this->currentTimeFactory->createTime();

            foreach ($this->producers as $info) {
                $producer = $info->producer;
                $period = $info->period;
                $lastrun = $this->adapter->getProducerLastRun($producer);

                if ($this->canBeRun($currentTime, $lastrun, $period)) { // bereme prvni vyhovujici
                    return $producer;
                }
            }

            return NULL;
        }


        private function canBeRun(Time $currentTime, Time $lastrun = NULL, Period $period = NULL)
        {
            if ($lastrun === NULL) {
                return TRUE;
            }

            if ($currentTime->isOlderThan($lastrun)) {
                return FALSE; // preskocime ty, ktere byly tuto minutu uz spusteny, nebo byly spusteny v budoucnosti
            }

            if ($period === NULL) {
                return TRUE;
            }

            $interval = (int) $period->getInterval();

            if (!$interval) { // nula => spustime hned
                return TRUE;
            }

            return $lastrun->getMinutesTo($currentTime) >= $interval;
        }


        private function processMessage()
        {
            $message = $this->adapter->fetchMessage();

            if (!$message) {
                $this->logger->log('No message to process.');
                if ($this->noMessageHandler !== NULL) {
                    call_user_func($this->noMessageHandler);
                }
                return;
            }

            $queue = $message->getQueue();
            $messageFail = TRUE;
            $hasWorkers = FALSE;
            $this->logger->log('Process message ' . $message->getId()->toString() . ' in queue ' . $queue);

            if (!empty($this->workers[$queue])) {
                $messageFail = FALSE;
                $hasWorkers = TRUE;
                $workers = $this->workers[$queue];

                foreach ($workers as $worker) {
                    try {
                        $worker->processMessage($message, $this->manager);

                    } catch (\Exception $e) {
                        $msg = array(
                            $e->getMessage(),
                            'in file ' . $e->getFile(),
                            'on line' . $e->getLine(),
                            '-----------------------------',
                            $e->getTraceAsString(),
                        );
                        $this->log("Worker " . get_class($worker) . " for '$queue' failed", implode("\n", $msg), $message);
                        $messageFail = TRUE;
                    }
                }
            }

            if (!$hasWorkers) {
                $this->logger->log("Queue '$queue' has no workers.");
                $this->log("Queue '$queue' has no workers.");
            }

            if ($messageFail) {
                $this->logger->log('Message processing failed.');
                $this->log('Message processing failed', NULL, $message);
                $this->adapter->markAsFailed($message, $this->currentTimeFactory->createTime());

            } else {
                $this->logger->log('Message processing done.');
                $this->adapter->markAsDone($message, $this->currentTimeFactory->createTime());
            }
        }
    }
donut-org/donut v0.8.0 API documentation API documentation generated by ApiGen