package com.aliyun.iotx.edge.tunnel.core.common.io;

import com.aliyun.iotx.edge.tunnel.core.common.constant.LoggerName;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/iotx-edge-tunnel-core-1.0.0-SNAPSHOT.jar:com/aliyun/iotx/edge/tunnel/core/common/io/StreamWorker.class */
public class StreamWorker implements Runnable {
    private static final int NUM_WORKER = 10;
    private final BlockingQueue<StreamTask> taskQueue = new LinkedBlockingQueue();
    private static final Logger ERROR_LOGGER = LoggerFactory.getLogger(LoggerName.ERROR);
    private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("STREAM-WORKER-ASYNC-t-%d").build();
    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(10, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), NAMED_THREAD_FACTORY);
    private static final List<StreamWorker> STREAM_WORKERS = Lists.newArrayList();
    private static final Random RANDOM = new Random();

    /* loaded from: input_file:BOOT-INF/lib/iotx-edge-tunnel-core-1.0.0-SNAPSHOT.jar:com/aliyun/iotx/edge/tunnel/core/common/io/StreamWorker$StreamTask.class */
    private static final class StreamTask {
        private final StreamPromise promise;
        private final Runnable task;

        private StreamTask(StreamPromise streamPromise, Runnable runnable) {
            this.promise = streamPromise;
            this.task = runnable;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public StreamPromise getPromise() {
            return this.promise;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Runnable getTask() {
            return this.task;
        }
    }

    StreamWorker() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static StreamWorker dispatch() {
        return STREAM_WORKERS.get(RANDOM.nextInt(10));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamPromise offerTask(Runnable runnable) throws RejectedExecutionException {
        StreamTask streamTask = new StreamTask(new StreamPromise(), runnable);
        if (this.taskQueue.offer(streamTask)) {
            return streamTask.promise;
        }
        throw new RejectedExecutionException("stream task is rejected");
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                StreamTask take = this.taskQueue.take();
                try {
                    take.getTask().run();
                    take.getPromise().finish();
                } catch (Throwable th) {
                    take.getPromise().finish();
                    throw th;
                }
            } catch (Throwable th2) {
                ERROR_LOGGER.error("stream worker catch unexpected error. errorMsg={}", th2.getMessage(), th2);
            }
        }
    }

    static {
        for (int i = 0; i < 10; i++) {
            StreamWorker streamWorker = new StreamWorker();
            STREAM_WORKERS.add(streamWorker);
            EXECUTOR.execute(streamWorker);
        }
    }
}
