Overview

Namespaces

  • Copro
    • Adapters
  • Donut
    • Adapters
    • AtomFeed
    • Facebook
    • Instagram
    • PostFeed
    • Twitter

Classes

  • Copro\Adapters\DibiMysqlAdapter
  • Donut\Adapters\DibiMysqlAdapter
  • Donut\Adapters\DibiSqliteAdapter
  • Donut\Adapters\MemoryAdapter
  • Donut\AtomFeed\AtomFeedItem
  • Donut\AtomFeed\ConvertItemToFacebookPost
  • Donut\AtomFeed\ConvertItemToInstagramPost
  • Donut\AtomFeed\ConvertItemToTweet
  • Donut\AtomFeed\FetchNewItems
  • Donut\DefaultCurrentTimeFactory
  • Donut\Facebook\FacebookApi
  • Donut\Facebook\FacebookPost
  • Donut\Facebook\PublishFacebookPost
  • Donut\Helpers
  • Donut\Instagram\InstagramApi
  • Donut\Instagram\InstagramPost
  • Donut\Instagram\PublishInstagramPost
  • Donut\Manager
  • Donut\Message
  • Donut\Period
  • Donut\PostFeed\ConvertItemToFacebookPost
  • Donut\PostFeed\ConvertItemToInstagramPost
  • Donut\PostFeed\ConvertItemToTweet
  • Donut\PostFeed\FetchNewItems
  • Donut\PostFeed\PostFeedItem
  • Donut\Processor
  • Donut\ProducerInfo
  • Donut\Queue
  • Donut\Time
  • Donut\Twitter\PublishTweet
  • Donut\Twitter\Tweet
  • Donut\Twitter\TwitterApi

Interfaces

  • Donut\IAdapter
  • Donut\ICurrentTimeFactory
  • Donut\IProducer
  • Donut\IWorker

Exceptions

  • Donut\Exception
  • Donut\InvalidArgumentException
  • Donut\InvalidStateException
  • Overview
  • Namespace
  • Class
  1:   2:   3:   4:   5:   6:   7:   8:   9:  10:  11:  12:  13:  14:  15:  16:  17:  18:  19:  20:  21:  22:  23:  24:  25:  26:  27:  28:  29:  30:  31:  32:  33:  34:  35:  36:  37:  38:  39:  40:  41:  42:  43:  44:  45:  46:  47:  48:  49:  50:  51:  52:  53:  54:  55:  56:  57:  58:  59:  60:  61:  62:  63:  64:  65:  66:  67:  68:  69:  70:  71:  72:  73:  74:  75:  76:  77:  78:  79:  80:  81:  82:  83:  84:  85:  86:  87:  88:  89:  90:  91:  92:  93:  94:  95:  96:  97:  98:  99: 100: 101: 102: 103: 104: 105: 106: 107: 108: 109: 110: 111: 112: 113: 114: 115: 116: 117: 118: 119: 120: 121: 122: 123: 124: 125: 126: 127: 128: 129: 130: 131: 132: 133: 134: 135: 136: 137: 138: 139: 140: 141: 142: 143: 144: 145: 146: 147: 148: 149: 150: 151: 152: 153: 154: 155: 156: 157: 158: 159: 160: 161: 162: 163: 164: 165: 166: 167: 168: 169: 170: 171: 172: 173: 174: 175: 176: 177: 178: 179: 180: 181: 
<?php

    namespace Donut\Adapters;

    use Donut\IAdapter;
    use Donut\Message;
    use Donut\Time;
    use Nette\Utils\Json;
    use Ramsey\Uuid\Uuid;


    class DibiMysqlAdapter implements IAdapter
    {
        /** @var \Dibi\Connection */
        private $connection;


        public function __construct(\Dibi\Connection $connection)
        {
            $this->connection = $connection;
        }


        /**
         * @return void
         */
        public function createMessage(Message $message)
        {
            $this->connection->query('INSERT INTO [message]', array(
                'id' => $message->getId(),
                'queue' => $message->getQueue(),
                'data' => Json::encode($message->getData()),
                'created' => $message->getCreated(),
                'status' => $message->getStatus(),
                'processed' => $message->getProcessed(),
            ));
        }


        /**
         * @return Message|NULL
         */
        public function fetchMessage()
        {
            $row = $this->connection->fetch('SELECT * FROM [message] ORDER BY [created] LIMIT 1');

            if ($row === FALSE) {
                return NULL;
            }

            return new Message(
                $row->id,
                $row->queue,
                Json::decode($row->data, Json::FORCE_ARRAY),
                Time::create($row->created),
                $row->status,
                $row->processed ? Time::create($row->processed) : NULL
            );
        }


        /**
         * @return void
         */
        public function markAsDone(Message $message)
        {
            $this->connection->query(
                'UPDATE [message] SET [status] = %i, [processed] = %t WHERE [id] = %i',
                Message::STATUS_DONE,
                Time::getCurrentTime(),
                $message->getId()
            );
        }


        /**
         * @return void
         */
        public function markAsFailed(Message $message)
        {
            $this->connection->query(
                'UPDATE [message] SET [status] = %i, [processed] = %t WHERE [id] = %i',
                Message::STATUS_FAILED,
                Time::getCurrentTime(),
                $message->getId()
            );
        }


        /**
         * @param  string
         * @param  string|NULL
         * @param  Message|NULL
         * @return void
         */
        public function log($subject, $text = NULL, Message $message = NULL)
        {
            $this->connection->query('INSERT INTO [log]', array(
                'id' => Uuid::uuid1(),
                'subject' => $subject,
                'text' => $text,
                'message' => $message->getId(),
                'date' => Time::getCurrentTime(),
            ));
        }


        /**
         * @return Time|NULL
         */
        public function getProducerLastRun(IProducer $producer)
        {
            $row = $this->connection->fetch('SELECT * FROM [producer] WHERE [producer] = %s', $this->getProducerId($producer));

            if ($row === FALSE) {
                return NULL;
            }

            return Time::create($row->lastrun);
        }


        /**
         * @return void
         */
        public function saveProducerLastRun(IProducer $producer, Time $lastrun)
        {
            $producerId = $this->getProducerId($producer);

            try {
                $this->connection->query('INSERT INTO [producer]', array(
                    'producer' => $producerId,
                    'lastrun' => $lastrun,
                ));

            } catch (\DibiDriverException $e) {
                if (substr($e->getMessage(), 0, 15) !== 'Duplicate entry') {
                    throw $e;
                }

                $this->connection->query('UPDATE [producer] SET lastrun = %t WHERE [producer] = %s', $lastrun, $producerId);
            }
        }


        /**
         * @param  IProducer
         * @param  string
         * @return bool
         */
        public function hasProducerItem(IProducer $producer, $itemId)
        {
            return $this->connection->fetch('SELECT * FROM [producer_item] WHERE [producer] = %s AND [item-id] = %s', $this->getProducerId($producer), $itemId) !== FALSE;
        }


        /**
         * @param  IProducer
         * @param  string
         * @param  Time
         * @return void
         */
        public function saveProducerItem(IProducer $producer, $itemId, Time $date)
        {
            $this->connection->query('INSERT INTO [producer_item]', array(
                'producer' => $this->getProducerId($producer),
                'item_id' => $itemId,
                'date' => $date->getDateTime(),
            ));
        }


        /**
         * @return string
         */
        private function getProducerId()
        {
            return md5($producer->getName());
        }
    }
donut-org/donut v0.8.0 API documentation API documentation generated by ApiGen