package com.bokesoft.distro.tech.yigosupport.extension.coordinate.impl;

import com.bokesoft.distro.tech.commons.basis.coordinate.SemaphoreEventBus;
import com.bokesoft.distro.tech.commons.basis.coordinate.intf.ISemaphoreChannel;
import com.bokesoft.distro.tech.commons.basis.coordinate.struct.Semaphore;
import com.bokesoft.distro.tech.yigosupport.extension.coordinate.intf.RedisCommands;
import com.bokesoft.distro.tech.yigosupport.extension.coordinate.intf.RedisFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPubSub;

/* loaded from: input_file:com/bokesoft/distro/tech/yigosupport/extension/coordinate/impl/RedisSemaphoreChannel.class */
public class RedisSemaphoreChannel implements ISemaphoreChannel {
    private static final String CHANNEL_PREFIX = "YIGO_SEMAPHORE.";
    private static final String CHANNEL_PATTERN = "YIGO_SEMAPHORE.*";
    private static final Logger log = LoggerFactory.getLogger(RedisSemaphoreChannel.class);
    private final RedisFactory factory;
    private final RedisConsumer consumer;
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(runnable -> {
        Thread thread = new Thread(runnable);
        thread.setDaemon(true);
        thread.setName("RedisSemaphoreChannel.YIGO_SEMAPHORE.");
        return thread;
    });
    private final SemaphoreEventBus eventBus;

    /* loaded from: input_file:com/bokesoft/distro/tech/yigosupport/extension/coordinate/impl/RedisSemaphoreChannel$RedisConsumer.class */
    private class RedisConsumer extends JedisPubSub implements Runnable {
        private final RedisFactory factory;

        private RedisConsumer(RedisFactory redisFactory) {
            this.factory = redisFactory;
        }

        public void onPSubscribe(String str, int i) {
            super.onPSubscribe(str, i);
            RedisSemaphoreChannel.log.info("信号量监听成功!");
        }

        public void onPMessage(String str, String str2, String str3) {
            super.onPMessage(str, str2, str3);
            try {
                str2.substring(RedisSemaphoreChannel.CHANNEL_PREFIX.length());
                RedisSemaphoreChannel.this.eventBus.onReceiver(Semaphore.fromJSONString(str3));
            } catch (Exception e) {
                RedisSemaphoreChannel.log.warn(String.format("信号量[%s] 处理失败! message= %s ", str2, str3), e);
            }
        }

        public void onPUnsubscribe(String str, int i) {
            super.onPUnsubscribe(str, i);
            RedisSemaphoreChannel.log.warn("信号量监听被取消!");
        }

        @Override // java.lang.Runnable
        public void run() {
            RedisSemaphoreChannel.log.info("开始监听信号量...");
            try {
                RedisCommands redis = this.factory.getRedis();
                Throwable th = null;
                try {
                    redis.psubscribe(this, RedisSemaphoreChannel.CHANNEL_PATTERN);
                    if (redis != null) {
                        if (0 != 0) {
                            try {
                                redis.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            redis.close();
                        }
                    }
                } finally {
                }
            } catch (Exception e) {
                RedisSemaphoreChannel.log.warn("信号量监听异常中断", e);
            }
            RedisSemaphoreChannel.log.warn("信号量监听,重试中...");
            RedisSemaphoreChannel.this.executor.schedule(this, 3L, TimeUnit.SECONDS);
        }
    }

    public RedisSemaphoreChannel(SemaphoreEventBus semaphoreEventBus, RedisFactory redisFactory) {
        this.eventBus = semaphoreEventBus;
        this.factory = redisFactory;
        this.consumer = new RedisConsumer(redisFactory);
        this.executor.execute(this.consumer);
    }

    public void doSend(Semaphore semaphore) {
        try {
            RedisCommands redis = this.factory.getRedis();
            Throwable th = null;
            try {
                try {
                    redis.publish(CHANNEL_PREFIX + semaphore.key, semaphore.toJSONString());
                    if (redis != null) {
                        if (0 != 0) {
                            try {
                                redis.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            redis.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
