package com.aliyun.iotx.edge.tunnel.core.common;

import com.aliyun.iotx.edge.tunnel.core.common.constant.CommunicationCode;
import com.aliyun.iotx.edge.tunnel.core.common.constant.CommunicationCodes;
import com.aliyun.iotx.edge.tunnel.core.common.constant.LoggerName;
import com.aliyun.iotx.edge.tunnel.core.common.exception.CommunicationException;
import com.aliyun.iotx.edge.tunnel.core.common.model.Identifier;
import com.aliyun.iotx.edge.tunnel.core.common.model.Message;
import com.aliyun.iotx.edge.tunnel.core.common.model.MessageHeader;
import com.aliyun.iotx.edge.tunnel.core.common.model.MessageType;
import com.aliyun.iotx.edge.tunnel.core.common.model.Payload;
import com.aliyun.iotx.edge.tunnel.core.common.model.Response;
import com.aliyun.iotx.edge.tunnel.core.common.quartz.MessageCallbackScheduler;
import com.aliyun.iotx.edge.tunnel.core.common.util.ChannelUtils;
import com.aliyun.iotx.edge.tunnel.core.common.util.CommunicationAssert;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/iotx-edge-tunnel-core-1.0.0-SNAPSHOT.jar:com/aliyun/iotx/edge/tunnel/core/common/AbstractWebSocketChannelHandler.class */
public abstract class AbstractWebSocketChannelHandler extends AbstractChannelHandler<WebSocketFrame> {
    private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("MESSAGE-CALLBACK-ASYNC-t-%d").build();
    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(8, 128, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(64), NAMED_THREAD_FACTORY);
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(LoggerName.TRACE);
    private static int count = 0;
    private final String schedulerGroup;
    private final List<byte[]> fragmentCache;
    private final Map<String, MessageCallback> callbacks;

    /* loaded from: input_file:BOOT-INF/lib/iotx-edge-tunnel-core-1.0.0-SNAPSHOT.jar:com/aliyun/iotx/edge/tunnel/core/common/AbstractWebSocketChannelHandler$SchedulerMessageCallBack.class */
    private static final class SchedulerMessageCallBack implements MessageCallback {
        private final MessageCallback target;
        private final Runnable sideEffect;

        private SchedulerMessageCallBack(MessageCallback messageCallback, Runnable runnable) {
            this.target = messageCallback;
            this.sideEffect = runnable;
        }

        @Override // com.aliyun.iotx.edge.tunnel.core.common.MessageCallback
        public void onSuccess(Message message, Response response) {
            throw new UnsupportedOperationException();
        }

        @Override // com.aliyun.iotx.edge.tunnel.core.common.MessageCallback
        public void onFailure(Message message, Response response) {
            throw new UnsupportedOperationException();
        }

        @Override // com.aliyun.iotx.edge.tunnel.core.common.MessageCallback
        public void onTimeout() throws Exception {
            this.target.onTimeout();
            this.sideEffect.run();
        }
    }

    public AbstractWebSocketChannelHandler() {
        StringBuilder append = new StringBuilder().append(getClass().getSimpleName()).append("#");
        int i = count;
        count = i + 1;
        this.schedulerGroup = append.append(i).toString();
        this.fragmentCache = Lists.newArrayList();
        this.callbacks = Maps.newConcurrentMap();
    }

    public final void addMessageCallback(String str, MessageCallback messageCallback) {
        this.callbacks.put(str, messageCallback);
        MessageCallbackScheduler.dispatch(str, this.schedulerGroup, new SchedulerMessageCallBack(messageCallback, () -> {
            this.callbacks.remove(str);
        }));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.netty.channel.SimpleChannelInboundHandler
    public final void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) {
        Pair<WebSocketFrame, byte[]> readIntactWebSocketFrame = readIntactWebSocketFrame(webSocketFrame);
        if (readIntactWebSocketFrame == null) {
            return;
        }
        Message message = null;
        MessageHeader messageHeader = null;
        String str = null;
        String str2 = null;
        Throwable th = null;
        long nanoTime = System.nanoTime();
        try {
            try {
                try {
                    message = (Message) CommunicationAssert.assertSuccess(() -> {
                        return Message.parse((byte[]) readIntactWebSocketFrame.getValue());
                    }, CommunicationCodes.MESSAGE_FORMAT_ILLEGAL, true);
                    messageHeader = message.getMessageHeader();
                    str = messageHeader.toReadableJSONString();
                    str2 = channelRead0(channelHandlerContext, readIntactWebSocketFrame.getKey(), message).toAbstractInfo();
                    if (readIntactWebSocketFrame.getKey() != webSocketFrame) {
                        ReferenceCountUtil.release(readIntactWebSocketFrame.getKey());
                    }
                    if (str2 == null) {
                        str2 = getPayloadIfFailed(messageHeader, message);
                    }
                    Logger logger = TRACE_LOGGER;
                    Object[] objArr = new Object[9];
                    objArr[0] = getHandlerName();
                    objArr[1] = getChannel();
                    objArr[2] = getAuthType();
                    objArr[3] = getIdentifierId();
                    objArr[4] = Long.valueOf(System.currentTimeMillis());
                    objArr[5] = str;
                    objArr[6] = str2;
                    objArr[7] = 0 == 0 ? "success" : th.getMessage();
                    objArr[8] = getUseTime(System.nanoTime() - nanoTime);
                    logger.info("receive message. handler={}; channel={}; authType={}; identifierId={}; timestamp={}; messageHeader={}; payloadAbstract={}; errorMsg={}; useTime={}", objArr);
                } catch (Throwable th2) {
                    TRACE_LOGGER.error("unexpected error. handler={}; channel={}; authType={}; identifierId={}; messageHeader={}; errorMsg={}", getHandlerName(), getChannel(), getAuthType(), getIdentifierId(), str, th2.getMessage(), th2);
                    ChannelUtils.response(channelHandlerContext.channel(), messageHeader, CommunicationCode.serverErrorOf(th2.getMessage())).addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelUtils.CLOSE);
                    if (readIntactWebSocketFrame.getKey() != webSocketFrame) {
                        ReferenceCountUtil.release(readIntactWebSocketFrame.getKey());
                    }
                    if (str2 == null) {
                        str2 = getPayloadIfFailed(messageHeader, message);
                    }
                    Logger logger2 = TRACE_LOGGER;
                    Object[] objArr2 = new Object[9];
                    objArr2[0] = getHandlerName();
                    objArr2[1] = getChannel();
                    objArr2[2] = getAuthType();
                    objArr2[3] = getIdentifierId();
                    objArr2[4] = Long.valueOf(System.currentTimeMillis());
                    objArr2[5] = str;
                    objArr2[6] = str2;
                    objArr2[7] = th2 == null ? "success" : th2.getMessage();
                    objArr2[8] = getUseTime(System.nanoTime() - nanoTime);
                    logger2.info("receive message. handler={}; channel={}; authType={}; identifierId={}; timestamp={}; messageHeader={}; payloadAbstract={}; errorMsg={}; useTime={}", objArr2);
                }
            } catch (CommunicationException e) {
                TRACE_LOGGER.warn("communication error. handler={}; channel={}; authType={}; identifierId={}; messageHeader={}; errorMsg={}", getHandlerName(), getChannel(), getAuthType(), getIdentifierId(), str, e.getMessage(), e);
                if (e.getChannelFutureListener() != null) {
                    ChannelUtils.response(channelHandlerContext.channel(), messageHeader, e.getCode()).addListener2((GenericFutureListener<? extends Future<? super Void>>) e.getChannelFutureListener());
                } else {
                    ChannelUtils.response(channelHandlerContext.channel(), messageHeader, e.getCode());
                }
                if (readIntactWebSocketFrame.getKey() != webSocketFrame) {
                    ReferenceCountUtil.release(readIntactWebSocketFrame.getKey());
                }
                if (str2 == null) {
                    str2 = getPayloadIfFailed(messageHeader, message);
                }
                Logger logger3 = TRACE_LOGGER;
                Object[] objArr3 = new Object[9];
                objArr3[0] = getHandlerName();
                objArr3[1] = getChannel();
                objArr3[2] = getAuthType();
                objArr3[3] = getIdentifierId();
                objArr3[4] = Long.valueOf(System.currentTimeMillis());
                objArr3[5] = str;
                objArr3[6] = str2;
                objArr3[7] = e == null ? "success" : e.getMessage();
                objArr3[8] = getUseTime(System.nanoTime() - nanoTime);
                logger3.info("receive message. handler={}; channel={}; authType={}; identifierId={}; timestamp={}; messageHeader={}; payloadAbstract={}; errorMsg={}; useTime={}", objArr3);
            }
        } catch (Throwable th3) {
            if (readIntactWebSocketFrame.getKey() != webSocketFrame) {
                ReferenceCountUtil.release(readIntactWebSocketFrame.getKey());
            }
            if (str2 == null) {
                str2 = getPayloadIfFailed(messageHeader, message);
            }
            Logger logger4 = TRACE_LOGGER;
            Object[] objArr4 = new Object[9];
            objArr4[0] = getHandlerName();
            objArr4[1] = getChannel();
            objArr4[2] = getAuthType();
            objArr4[3] = getIdentifierId();
            objArr4[4] = Long.valueOf(System.currentTimeMillis());
            objArr4[5] = str;
            objArr4[6] = str2;
            objArr4[7] = 0 == 0 ? "success" : th.getMessage();
            objArr4[8] = getUseTime(System.nanoTime() - nanoTime);
            logger4.info("receive message. handler={}; channel={}; authType={}; identifierId={}; timestamp={}; messageHeader={}; payloadAbstract={}; errorMsg={}; useTime={}", objArr4);
            throw th3;
        }
    }

    protected abstract Payload channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame, Message message) throws Exception;

    protected abstract Identifier getIdentifier();

    /* JADX INFO: Access modifiers changed from: protected */
    public final Payload onResponse(Message message) {
        String messageId = message.getMessageId();
        Response response = (Response) CommunicationAssert.assertSuccess(() -> {
            return Response.parse(message.getPayload());
        }, CommunicationCodes.MESSAGE_PAYLOAD_FORMAT_ILLEGAL, true);
        String readableJSONString = message.getMessageHeader().toReadableJSONString();
        if (!response.isSuccess()) {
            TRACE_LOGGER.warn("receive failed response. handler={}; channel={}; messageHeader={}; response={}", getHandlerName(), getChannel(), readableJSONString, response.toAbstractInfo());
        }
        MessageCallback remove = this.callbacks.remove(messageId);
        if (remove == null) {
            TRACE_LOGGER.debug("message miss callback. handler={}; channel={}; messageHeader={}; response={}", getHandlerName(), getChannel(), readableJSONString, response.toAbstractInfo());
            onResponse0(message, response);
            return response;
        }
        MessageCallbackScheduler.unDispatch(messageId, this.schedulerGroup);
        if (response.isSuccess()) {
            EXECUTOR.execute(() -> {
                try {
                    remove.onSuccess(message, response);
                } catch (Throwable th) {
                    TRACE_LOGGER.error("message callback 'onSuccess' catch exception. handler={}; channel={}; messageHeader={}; errorMsg={}", getHandlerName(), getChannel(), readableJSONString, th.getMessage(), th);
                }
            });
        } else {
            EXECUTOR.execute(() -> {
                try {
                    remove.onFailure(message, response);
                } catch (Throwable th) {
                    TRACE_LOGGER.error("message callback 'onFailure' catch exception. handler={}; channel={}; messageHeader={}; errorMsg={}", getHandlerName(), getChannel(), readableJSONString, th.getMessage(), th);
                }
            });
        }
        onResponse0(message, response);
        return response;
    }

    protected void onResponse0(Message message, Response response) {
    }

    private Pair<WebSocketFrame, byte[]> readIntactWebSocketFrame(WebSocketFrame webSocketFrame) {
        byte[] bArr;
        if (webSocketFrame instanceof TextWebSocketFrame) {
            bArr = ((TextWebSocketFrame) webSocketFrame).text().getBytes();
        } else if (webSocketFrame instanceof BinaryWebSocketFrame) {
            ByteBuf content = ((BinaryWebSocketFrame) webSocketFrame).content();
            bArr = new byte[content.readableBytes()];
            content.getBytes(0, bArr);
        } else {
            if (!(webSocketFrame instanceof ContinuationWebSocketFrame)) {
                if (webSocketFrame instanceof PingWebSocketFrame) {
                    TRACE_LOGGER.info("receive 'PingWebSocketFrame'. handler={}; channel={}", getHandlerName(), getChannel());
                    return null;
                }
                if (webSocketFrame instanceof PongWebSocketFrame) {
                    TRACE_LOGGER.info("receive 'PongWebSocketFrame'. handler={}; channel={}", getHandlerName(), getChannel());
                    return null;
                }
                if (!(webSocketFrame instanceof CloseWebSocketFrame)) {
                    throw new UnsupportedOperationException("unsupported WebSocketFrame's type. type='" + webSocketFrame.getClass() + "'");
                }
                TRACE_LOGGER.info("receive webSocket 'CloseWebSocketFrame'. handler={}; channel={}", getHandlerName(), getChannel());
                return null;
            }
            ByteBuf content2 = ((ContinuationWebSocketFrame) webSocketFrame).content();
            bArr = new byte[content2.readableBytes()];
            content2.getBytes(0, bArr);
        }
        if (webSocketFrame.isFinalFragment() && this.fragmentCache.isEmpty()) {
            return new ImmutablePair(webSocketFrame, bArr);
        }
        if (!webSocketFrame.isFinalFragment()) {
            TRACE_LOGGER.info("receive fragments webSocketFrame. handler={}; channel={}", getHandlerName(), getChannel());
            this.fragmentCache.add(bArr);
            return null;
        }
        int i = 0;
        Iterator<byte[]> it = this.fragmentCache.iterator();
        while (it.hasNext()) {
            i += it.next().length;
        }
        byte[] bArr2 = new byte[i + bArr.length];
        int i2 = 0;
        for (byte[] bArr3 : this.fragmentCache) {
            System.arraycopy(bArr3, 0, bArr2, i2, bArr3.length);
            i2 += bArr3.length;
        }
        System.arraycopy(bArr, 0, bArr2, i2, bArr.length);
        this.fragmentCache.clear();
        ByteBuf buffer = Unpooled.buffer();
        buffer.writeBytes(bArr2);
        return new ImmutablePair(new BinaryWebSocketFrame(buffer), bArr2);
    }

    private String getPayloadIfFailed(MessageHeader messageHeader, Message message) {
        if (messageHeader == null || message == null) {
            return null;
        }
        MessageType typeOf = MessageType.typeOf(message.getMessageHeader().getMessageType());
        byte[] payload = message.getPayload();
        switch (typeOf) {
            case MSG_BACKEND_CONN_REQ:
            case MSG_FRONTEND_CONN_REQ:
            case MSG_LOGIN:
                return new String(payload, Charset.defaultCharset());
            default:
                return null;
        }
    }

    private String getIdentifierId() {
        if (getIdentifier() == null) {
            return null;
        }
        return getIdentifier().getId();
    }

    private String getAuthType() {
        if (getIdentifier() == null) {
            return null;
        }
        return getIdentifier().getType().getType();
    }
}
