Overview

Namespaces

  • Copro
    • Adapters
  • Donut
    • Adapters
    • AtomFeed
    • Facebook
    • Instagram
    • PostFeed
    • Twitter

Classes

  • Copro\Adapters\DibiMysqlAdapter
  • Donut\Adapters\DibiMysqlAdapter
  • Donut\Adapters\DibiSqliteAdapter
  • Donut\Adapters\MemoryAdapter
  • Donut\AtomFeed\AtomFeedItem
  • Donut\AtomFeed\ConvertItemToFacebookPost
  • Donut\AtomFeed\ConvertItemToInstagramPost
  • Donut\AtomFeed\ConvertItemToTweet
  • Donut\AtomFeed\FetchNewItems
  • Donut\DefaultCurrentTimeFactory
  • Donut\Facebook\FacebookApi
  • Donut\Facebook\FacebookPost
  • Donut\Facebook\PublishFacebookPost
  • Donut\Helpers
  • Donut\Instagram\InstagramApi
  • Donut\Instagram\InstagramPost
  • Donut\Instagram\PublishInstagramPost
  • Donut\Manager
  • Donut\Message
  • Donut\Period
  • Donut\PostFeed\ConvertItemToFacebookPost
  • Donut\PostFeed\ConvertItemToInstagramPost
  • Donut\PostFeed\ConvertItemToTweet
  • Donut\PostFeed\FetchNewItems
  • Donut\PostFeed\PostFeedItem
  • Donut\Processor
  • Donut\ProducerInfo
  • Donut\Queue
  • Donut\Time
  • Donut\Twitter\PublishTweet
  • Donut\Twitter\Tweet
  • Donut\Twitter\TwitterApi

Interfaces

  • Donut\IAdapter
  • Donut\ICurrentTimeFactory
  • Donut\IProducer
  • Donut\IWorker

Exceptions

  • Donut\Exception
  • Donut\InvalidArgumentException
  • Donut\InvalidStateException
  • Overview
  • Namespace
  • Class
  1:   2:   3:   4:   5:   6:   7:   8:   9:  10:  11:  12:  13:  14:  15:  16:  17:  18:  19:  20:  21:  22:  23:  24:  25:  26:  27:  28:  29:  30:  31:  32:  33:  34:  35:  36:  37:  38:  39:  40:  41:  42:  43:  44:  45:  46:  47:  48:  49:  50:  51:  52:  53:  54:  55:  56:  57:  58:  59:  60:  61:  62:  63:  64:  65:  66:  67:  68:  69:  70:  71:  72:  73:  74:  75:  76:  77:  78:  79:  80:  81:  82:  83:  84:  85:  86:  87:  88:  89:  90:  91:  92:  93:  94:  95:  96:  97:  98:  99: 100: 101: 102: 103: 104: 105: 106: 107: 108: 109: 110: 111: 112: 113: 114: 115: 116: 117: 118: 119: 120: 121: 122: 123: 124: 125: 126: 127: 128: 129: 130: 131: 132: 133: 134: 135: 136: 137: 138: 139: 140: 141: 142: 143: 144: 145: 146: 147: 148: 149: 150: 151: 152: 153: 154: 155: 156: 157: 158: 159: 160: 161: 162: 163: 164: 165: 166: 167: 168: 169: 170: 171: 172: 173: 174: 175: 176: 177: 178: 179: 180: 181: 182: 183: 184: 185: 186: 187: 188: 189: 190: 191: 192: 193: 194: 195: 196: 197: 198: 199: 200: 201: 202: 203: 204: 205: 206: 207: 208: 209: 210: 211: 212: 213: 214: 215: 216: 217: 218: 219: 220: 221: 222: 223: 224: 225: 226: 227: 228: 229: 230: 231: 232: 233: 234: 235: 236: 237: 238: 239: 240: 241: 242: 243: 244: 245: 246: 247: 248: 249: 250: 251: 252: 253: 254: 255: 256: 257: 258: 259: 260: 261: 262: 263: 264: 265: 266: 267: 268: 269: 270: 271: 272: 273: 274: 275: 276: 277: 278: 279: 280: 281: 282: 283: 284: 285: 286: 287: 288: 289: 290: 291: 292: 293: 294: 295: 296: 297: 298: 299: 300: 301: 302: 303: 304: 305: 306: 307: 308: 309: 
<?php

    namespace Copro\Adapters;

    use DibiConnection;
    use Copro\Event;
    use Copro\IAdapter;
    use Copro\IEntity;
    use Copro\IChannel;
    use Copro\ILogger;
    use Copro\IProducer;
    use Copro\ChannelInfo;
    use Copro\Utils\Time;
    use Nette\Utils\Json;
    use Nette\Utils\Strings;


    class DibiMysqlAdapter implements IAdapter, ILogger
    {
        /** @var DibiConnection */
        private $connection;

        /** @var array  [name => id] */
        private $channels;


        public function __construct($connection)
        {
            if (!($connection instanceof DibiConnection)) {
                $connection = new DibiConnection($connection);
            }
            $this->connection = $connection;
        }


        public function log($subject, $message = NULL, Event $event = NULL)
        {
            $this->connection->query('INSERT INTO [log]', array(
                'time' => $this->getTime(),
                'subject' => $subject,
                'message' => $message,
                'event_id' => isset($event) ? $event->getId() : NULL,
                'token' => Strings::random(32),
            ));
        }


        /**
         * @return void
         */
        public function begin()
        {
        }


        /**
         * @return void
         */
        public function finish()
        {
            // $this->connection->commit();
        }


        /**
         * @param  IChannel
         * @param  IEntity
         * @param  bool
         * @return void
         * @throws \RuntimeException
         */
        public function createEvent(IChannel $channel, IEntity $entity, $persistent)
        {
            $this->connection->query('INSERT INTO [event]', array(
                'channel_id' => $this->getChannelId($channel),
                'type' => (string) $entity->getType(),
                'data' => Json::encode($entity->getData()),
                'createdAt' => $this->getTime(),
                'processed' => NULL,
                'persistent' => (int) (bool) $persistent,
            ));
        }


        /**
         * @return Event|FALSE
         */
        public function getEvent()
        {
            $row = $this->connection->fetch('
                SELECT [event].*, [channel].[name] AS [channel_name] FROM [event]
                JOIN [channel] ON [event].[channel_id] = [channel].[id]
                LIMIT 1
            ');

            if ($row === FALSE) {
                return FALSE;
            }

            return new Event(
                $row->id,
                $row->channel_name,
                $row->type,
                Json::decode($row->data, Json::FORCE_ARRAY),
                $row->createdAt,
                (bool) $row->persistent
            );
        }


        /**
         * @param  Event
         * @return void
         */
        public function failEvent(Event $event)
        {
            try {
                $this->connection->begin();
                $this->copyEvent('event_fail', $event);
                $this->markEventAsProcessed('event_fail', $event);
                $this->deleteEvent($event);
                $this->connection->commit();

            } catch (\Exception $e) {
                $this->connection->rollback();
                throw $e;
            }
        }


        /**
         * @param  Event
         * @return void
         */
        public function doneEvent(Event $event)
        {
            try {
                $this->connection->begin();

                if ($event->isPersistent()) {
                    $this->copyEvent('event_done', $event);
                    $this->markEventAsProcessed('event_done', $event);
                }

                $this->deleteEvent($event);
                $this->connection->commit();

            } catch (\Exception $e) {
                $this->connection->rollback();
                throw $e;
            }
        }


        private function copyEvent($tableName, Event $event)
        {
            $this->connection->query('INSERT INTO %n SELECT * FROM [event] WHERE [id] = %i', $tableName, $event->getId());
        }


        private function markEventAsProcessed($tableName, Event $event)
        {
            $this->connection->query('UPDATE %n SET [processed] = %t WHERE [id] = %i',
                $tableName,
                $this->getTime(),
                $event->getId()
            );
        }


        private function deleteEvent(Event $event)
        {
            $this->connection->query('DELETE FROM [event] WHERE [id] = %i', $event->getId());
        }


        /**
         * @return \DateTime
         */
        private function getTime()
        {
            return Time::getCurrentTime();
        }


        /**
         * @return \DateTime|NULL
         */
        public function getProducerLastRun(IProducer $producer)
        {
            $name = md5(get_class($producer));
            $value = $this->connection->fetchSingle('SELECT [lastrun] FROM [producer] WHERE [name] = %s', $name);

            if ($value === FALSE) {
                return NULL;
            }

            return $value;
        }


        /**
         * @return void
         */
        public function setProducerLastRun(IProducer $producer, \DateTime $lastrun)
        {
            $name = md5(get_class($producer));

            try {
                $this->connection->query('INSERT INTO [producer]', array(
                    'name' => $name,
                    'lastrun' => $lastrun,
                ));

            } catch (\DibiDriverException $e) {
                if (substr($e->getMessage(), 0, 15) !== 'Duplicate entry') {
                    throw $e;
                }

                $this->connection->query('UPDATE [producer] SET lastrun = %t WHERE [name] = %s', $lastrun, $name);
            }
        }


        /**
         * @return ChannelInfo
         */
        public function getChannelInfo(IChannel $channel)
        {
            $lastcheck = $this->connection->fetchSingle('SELECT [lastcheck] FROM [channel] WHERE [id] = %i', $this->getChannelId($channel));
            return new ChannelInfo($lastcheck);
        }


        /**
         * @return void
         */
        public function updateChannelInfo(IChannel $channel, \DateTime $lastcheck = NULL)
        {
            $this->connection->query('UPDATE [channel] SET [lastcheck] = %t WHERE [id] = %i', $lastcheck, $this->getChannelId($channel));
        }


        /**
         * Zkontroluje existenci zaznamu
         * @param  IChannel
         * @param  string
         * @return bool
         */
        public function existsChannelEntry(IChannel $channel, $entryId)
        {
            $id = $this->connection->fetchSingle(
                'SELECT [id] FROM [channel_entry] WHERE [channel_id] = %i AND [uniq] = %s LIMIT 1',
                $this->getChannelId($channel),
                md5($entryId)
            );

            return $id !== FALSE;
        }


        /**
         * @param  IChannel
         * @param  string
         * @return void
         */
        public function saveChannelEntry(IChannel $channel, $entryId, \DateTime $date)
        {
            $data = array(
                'channel_id' => $this->getChannelId($channel),
                'uniq' => md5($entryId),
                'date' => $date,
            );

            try {
                $this->connection->query('INSERT INTO [channel_entry]', $data);

            } catch (\DibiDriverException $e) {
                if (substr($e->getMessage(), 0, 15) !== 'Duplicate entry') {
                    throw $e;
                }
            }
        }



        private function getChannelId(IChannel $channel)
        {
            $name = $channel->getName();

            if (!isset($this->channels[$name])) {
                $id = $this->connection->fetchSingle('SELECT [id] FROM [channel] WHERE [name] = %s', $channel->getName());

                if ($id === FALSE) { // new channel
                    $this->connection->query('INSERT INTO [channel]', array(
                        'name' => $channel->getName(),
                        'createdAt' => $this->getTime(),
                        'lastcheck' => NULL,
                    ));
                    $id = $this->connection->getInsertId();
                }

                $this->channels[$name] = $id;
            }

            return $this->channels[$name];
        }
    }
donut-org/donut v0.8.0 API documentation API documentation generated by ApiGen