/*
 * Decompiled with CFR 0.152.
 */
package com.bokesoft.distro.tech.yigosupport.extension.coordinate.impl;

import com.bokesoft.distro.tech.commons.basis.coordinate.SemaphoreEventBus;
import com.bokesoft.distro.tech.commons.basis.coordinate.intf.ISemaphoreChannel;
import com.bokesoft.distro.tech.commons.basis.coordinate.struct.Semaphore;
import com.bokesoft.distro.tech.yigosupport.extension.coordinate.intf.RedisCommands;
import com.bokesoft.distro.tech.yigosupport.extension.coordinate.intf.RedisFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPubSub;

public class RedisSemaphoreChannel
implements ISemaphoreChannel {
    private static final String CHANNEL_PREFIX = "YIGO_SEMAPHORE.";
    private static final String CHANNEL_PATTERN = "YIGO_SEMAPHORE.*";
    private static final Logger log = LoggerFactory.getLogger(RedisSemaphoreChannel.class);
    private final RedisFactory factory;
    private final RedisConsumer consumer;
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(r -> {
        Thread th = new Thread(r);
        th.setDaemon(true);
        th.setName("RedisSemaphoreChannel.YIGO_SEMAPHORE.");
        return th;
    });
    private final SemaphoreEventBus eventBus;

    public RedisSemaphoreChannel(SemaphoreEventBus eventBus, RedisFactory factory) {
        this.eventBus = eventBus;
        this.factory = factory;
        this.consumer = new RedisConsumer(factory);
        this.executor.execute(this.consumer);
    }

    public void doSend(Semaphore semaphore) {
        try (RedisCommands cmd = this.factory.getRedis();){
            cmd.publish(CHANNEL_PREFIX + semaphore.key, semaphore.toJSONString());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private class RedisConsumer
    extends JedisPubSub
    implements Runnable {
        private final RedisFactory factory;

        private RedisConsumer(RedisFactory factory) {
            this.factory = factory;
        }

        public void onPSubscribe(String pattern, int subscribedChannels) {
            super.onPSubscribe(pattern, subscribedChannels);
            log.info("\u4fe1\u53f7\u91cf\u76d1\u542c\u6210\u529f!");
        }

        public void onPMessage(String pattern, String channel, String message) {
            super.onPMessage(pattern, channel, message);
            try {
                String sema = channel.substring(RedisSemaphoreChannel.CHANNEL_PREFIX.length());
                Semaphore semaphore = Semaphore.fromJSONString((String)message);
                RedisSemaphoreChannel.this.eventBus.onReceiver(semaphore);
            }
            catch (Exception e) {
                log.warn(String.format("\u4fe1\u53f7\u91cf[%s] \u5904\u7406\u5931\u8d25! message= %s ", channel, message), (Throwable)e);
            }
        }

        public void onPUnsubscribe(String pattern, int subscribedChannels) {
            super.onPUnsubscribe(pattern, subscribedChannels);
            log.warn("\u4fe1\u53f7\u91cf\u76d1\u542c\u88ab\u53d6\u6d88!");
        }

        @Override
        public void run() {
            log.info("\u5f00\u59cb\u76d1\u542c\u4fe1\u53f7\u91cf...");
            try (RedisCommands commands = this.factory.getRedis();){
                commands.psubscribe(this, RedisSemaphoreChannel.CHANNEL_PATTERN);
            }
            catch (Exception e) {
                log.warn("\u4fe1\u53f7\u91cf\u76d1\u542c\u5f02\u5e38\u4e2d\u65ad", (Throwable)e);
            }
            log.warn("\u4fe1\u53f7\u91cf\u76d1\u542c,\u91cd\u8bd5\u4e2d...");
            RedisSemaphoreChannel.this.executor.schedule(this, 3L, TimeUnit.SECONDS);
        }
    }
}

