package com.aliyun.iotx.edge.tunnel.core.frontend;

import com.aliyun.iotx.edge.tunnel.core.base.BaseAgent;
import com.aliyun.iotx.edge.tunnel.core.common.model.Connection;
import com.aliyun.iotx.edge.tunnel.core.common.model.Message;
import com.aliyun.iotx.edge.tunnel.core.common.model.MessageHeader;
import com.aliyun.iotx.edge.tunnel.core.common.model.MessageType;
import com.aliyun.iotx.edge.tunnel.core.common.model.ServiceType;
import com.aliyun.iotx.edge.tunnel.core.common.quartz.HeartBeat;
import com.aliyun.iotx.edge.tunnel.core.common.util.ChannelUtils;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Queue;
import java.util.UUID;

/* loaded from: input_file:BOOT-INF/lib/iotx-edge-tunnel-core-1.0.0-SNAPSHOT.jar:com/aliyun/iotx/edge/tunnel/core/frontend/BaseFrontendServiceHandler.class */
public abstract class BaseFrontendServiceHandler extends SimpleChannelInboundHandler<Object> {
    private static final int MAX_BYTES = 50000;
    protected final String serverUrl;
    protected final String backendServiceIp;
    protected final int backendServicePort;
    private Channel frontendAgentChannel;
    private final Queue<byte[]> messageCache = Lists.newLinkedList();
    private volatile boolean isConnected = false;

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseFrontendServiceHandler(String str, String str2, int i) {
        this.serverUrl = str;
        this.backendServiceIp = str2;
        this.backendServicePort = i;
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        BaseAgent agent = getAgent(this.serverUrl);
        String uuid = UUID.randomUUID().toString();
        this.frontendAgentChannel = agent.connect(new FrontendAgentHandler(channelHandlerContext.channel(), uuid, this::finishConnect), 120);
        sendConnectMessage(uuid);
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        if (this.frontendAgentChannel != null) {
            ChannelUtils.close(this.frontendAgentChannel, this, "due to frontend service closed");
        }
    }

    @Override // io.netty.channel.SimpleChannelInboundHandler
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) {
        ByteBuf byteBuf = (ByteBuf) obj;
        byte[] bArr = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bArr);
        if (this.isConnected) {
            if (!this.messageCache.isEmpty()) {
                flushCachedMessage();
            }
            syncMessage(bArr);
        } else {
            synchronized (this) {
                if (!this.isConnected) {
                    this.messageCache.add(bArr);
                }
            }
        }
    }

    protected abstract BaseAgent getAgent(String str);

    protected abstract Connection getConnection() throws Exception;

    private void sendConnectMessage(String str) throws Exception {
        MessageHeader build = MessageHeader.builder().messageType(MessageType.MSG_FRONTEND_CONN_REQ.type()).serviceType(ServiceType.DEFAULT.type()).messageId(str).timestamp(System.currentTimeMillis()).build();
        ChannelUtils.writeAndFlushWithFailureListener(this.frontendAgentChannel, build, Message.valueOf(build, getConnection().serialize()).toBinaryWebSocketFrame());
    }

    private synchronized void finishConnect() {
        flushCachedMessage();
        HeartBeat.register(this.frontendAgentChannel);
        this.isConnected = true;
    }

    private void flushCachedMessage() {
        while (!this.messageCache.isEmpty()) {
            syncMessage(this.messageCache.poll());
        }
    }

    private void syncMessage(byte[] bArr) {
        int length = bArr.length;
        while (true) {
            int i = length;
            if (i <= 0) {
                return;
            }
            int i2 = i > MAX_BYTES ? MAX_BYTES : i;
            byte[] bArr2 = new byte[i2];
            System.arraycopy(bArr, bArr.length - i, bArr2, 0, i2);
            MessageHeader build = MessageHeader.builder().messageType(MessageType.MSG_FROM_FRONTEND_TO_BACKEND.type()).serviceType(ServiceType.DEFAULT.type()).messageId(UUID.randomUUID().toString()).timestamp(System.currentTimeMillis()).build();
            ChannelUtils.writeAndFlushWithFailureListener(this.frontendAgentChannel, build, Message.valueOf(build, bArr2).toBinaryWebSocketFrame());
            length = i - i2;
        }
    }
}
