package com.aliyun.iotx.edge.tunnel.core.common.io;

import com.aliyun.iotx.edge.tunnel.core.common.constant.LoggerName;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/iotx-edge-tunnel-core-1.0.0-SNAPSHOT.jar:com/aliyun/iotx/edge/tunnel/core/common/io/StreamSelector.class */
class StreamSelector {
    private static final Logger ERROR_LOGGER = LoggerFactory.getLogger(LoggerName.ERROR);
    private static final Field CLOSED_BY_WRITER;
    private static final Field CLOSED_BY_READER;
    private static final int SLEEP_INTERVAL_MIN = 0;
    private static final int SLEEP_INTERVAL_MAX = 100;
    private final Map<String, Stream> streams = Maps.newConcurrentMap();
    private final Set<String> removedStreamIds = Sets.newConcurrentHashSet();
    private final Map<String, StreamPromise> promises = Maps.newHashMap();
    private int sleepInterval = 0;
    private volatile boolean isActive = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/iotx-edge-tunnel-core-1.0.0-SNAPSHOT.jar:com/aliyun/iotx/edge/tunnel/core/common/io/StreamSelector$Stream.class */
    public static final class Stream {
        private final String id;
        private final InputStream inputStream;
        private final StreamCallBack streamCallBack;
        private final StreamWorker streamWorker;

        private Stream(InputStream inputStream, StreamCallBack streamCallBack, StreamWorker streamWorker) {
            this.id = UUID.randomUUID().toString();
            this.inputStream = inputStream;
            this.streamCallBack = streamCallBack;
            this.streamWorker = streamWorker;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getId() {
            return this.id;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public InputStream getInputStream() {
            return this.inputStream;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public StreamCallBack getStreamCallBack() {
            return this.streamCallBack;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public StreamWorker getStreamWorker() {
            return this.streamWorker;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addStream(InputStream inputStream, StreamCallBack streamCallBack) {
        Stream stream = new Stream(inputStream, streamCallBack, StreamWorker.dispatch());
        this.streams.put(stream.getId(), stream);
        notifyAll();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int streamSize() {
        return this.streams.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isActive() {
        return this.isActive;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startSelect() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                try {
                    synchronized (this) {
                        if (streamSize() == 0) {
                            try {
                                wait();
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                    boolean z = false;
                    for (Stream stream : this.streams.values()) {
                        String id = stream.getId();
                        InputStream inputStream = stream.getInputStream();
                        StreamCallBack streamCallBack = stream.getStreamCallBack();
                        StreamWorker streamWorker = stream.getStreamWorker();
                        try {
                        } catch (Throwable th) {
                            ERROR_LOGGER.error("stream selector catch error. errorMsg={}", th.getMessage(), th);
                            this.removedStreamIds.add(stream.getId());
                        }
                        if (isClosed(inputStream)) {
                            this.removedStreamIds.add(stream.getId());
                        } else if (inputStream.available() > 0) {
                            z = true;
                            StreamPromise streamPromise = this.promises.get(id);
                            if (streamPromise == null || streamPromise.isDone()) {
                                try {
                                    this.promises.put(id, streamWorker.offerTask(() -> {
                                        try {
                                            streamCallBack.onSelected();
                                        } catch (Throwable th2) {
                                            ERROR_LOGGER.error("stream callback catch exception when execute onSelected. errorMsg={}", th2.getMessage(), th2);
                                            this.removedStreamIds.add(stream.getId());
                                        }
                                    }));
                                } catch (RejectedExecutionException e2) {
                                    ERROR_LOGGER.error("stream selector has been rejected. errorMsg={}", e2.getMessage(), e2);
                                }
                            }
                        }
                    }
                    for (String str : Lists.newArrayList(this.removedStreamIds)) {
                        Stream remove = this.streams.remove(str);
                        this.promises.remove(str);
                        if (remove != null) {
                            StreamCallBack streamCallBack2 = remove.getStreamCallBack();
                            try {
                                remove.getStreamWorker().offerTask(() -> {
                                    try {
                                        streamCallBack2.onRemoved();
                                    } catch (Throwable th2) {
                                        ERROR_LOGGER.error("stream callback catch exception when execute onRemoved. errorMsg={}", th2.getMessage(), th2);
                                    }
                                });
                                this.removedStreamIds.remove(str);
                            } catch (RejectedExecutionException e3) {
                                ERROR_LOGGER.error("stream selector has been rejected. errorMsg={}", e3.getMessage(), e3);
                            }
                        }
                    }
                    if (z) {
                        this.sleepInterval = 0;
                    } else {
                        int i = this.sleepInterval + 1;
                        this.sleepInterval = i;
                        if (i >= 100) {
                            this.sleepInterval = 100;
                        }
                    }
                    TimeUnit.MILLISECONDS.sleep(this.sleepInterval);
                } catch (Throwable th2) {
                    ERROR_LOGGER.error("stream selector catch error. errorMsg={}", th2.getMessage(), th2);
                    for (Stream stream2 : this.streams.values()) {
                        StreamCallBack streamCallBack3 = stream2.getStreamCallBack();
                        try {
                            stream2.getStreamWorker().offerTask(() -> {
                                try {
                                    streamCallBack3.onRemoved();
                                } catch (Throwable th3) {
                                    ERROR_LOGGER.error("stream callback catch exception when execute onRemoved. errorMsg={}", th3.getMessage(), th3);
                                }
                            });
                        } catch (RejectedExecutionException e4) {
                            ERROR_LOGGER.error("stream selector has been rejected. errorMsg={}", e4.getMessage(), e4);
                        }
                    }
                    this.isActive = false;
                    return;
                }
            } finally {
                for (Stream stream3 : this.streams.values()) {
                    StreamCallBack streamCallBack4 = stream3.getStreamCallBack();
                    try {
                        stream3.getStreamWorker().offerTask(() -> {
                            try {
                                streamCallBack4.onRemoved();
                            } catch (Throwable th3) {
                                ERROR_LOGGER.error("stream callback catch exception when execute onRemoved. errorMsg={}", th3.getMessage(), th3);
                            }
                        });
                    } catch (RejectedExecutionException e5) {
                        ERROR_LOGGER.error("stream selector has been rejected. errorMsg={}", e5.getMessage(), e5);
                    }
                }
                this.isActive = false;
            }
        }
    }

    private boolean isClosed(InputStream inputStream) {
        if (!(inputStream instanceof PipedInputStream)) {
            return false;
        }
        try {
            return ((Boolean) CLOSED_BY_WRITER.get(inputStream)).booleanValue() || ((Boolean) CLOSED_BY_READER.get(inputStream)).booleanValue();
        } catch (Throwable th) {
            return true;
        }
    }

    static {
        try {
            CLOSED_BY_WRITER = PipedInputStream.class.getDeclaredField("closedByWriter");
            CLOSED_BY_READER = PipedInputStream.class.getDeclaredField("closedByReader");
            CLOSED_BY_WRITER.setAccessible(true);
            CLOSED_BY_READER.setAccessible(true);
        } catch (Throwable th) {
            throw new Error(th);
        }
    }
}
