设计模式

Posted on Fri, Oct 14, 2022 learning

这几天通过在公司看代码,发现很多设计模式的使用,这里进行一下总结:

生产者-消费者模型

通过这几天的观察,这个主要用于异步消息写入或者服务间解耦,主要是异步。

package com.flowpp.pipe.operator.impl.writer.file;

import cn.hutool.core.io.FileUtil;
import com.alibaba.fastjson.JSONObject;
import com.flowpp.pipe.api.annotation.Operator;
import com.flowpp.pipe.api.decriptor.AttrDescriptor;
import com.flowpp.pipe.api.decriptor.InputDescriptor;
import com.flowpp.pipe.api.decriptor.MetaDescriptor;
import com.flowpp.pipe.api.decriptor.OutputDescriptor;
import com.flowpp.pipe.api.env.Env;
import com.flowpp.pipe.api.operator.define.writer.AbstractWriter;
import com.flowpp.pipe.api.utils.NamedThreadFactory;
import com.flowpp.pipe.common.constant.DataType;
import com.flowpp.pipe.operator.constant.OperatorConstant;
import com.flowpp.pipe.record.constant.FixedFieldDict;
import com.flowpp.pipe.record.message.AbstractNativeMsg;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue;
import io.prometheus.client.Gauge;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.keyvalue.MultiKey;
import org.apache.commons.collections4.map.MultiKeyMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringEscapeUtils;

import java.io.Closeable;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import io.prometheus.client.Counter;

/**
 * FIXME TJ定制的输出算子,临时解决办法
 *
 * @author XFS
 * @version 1.0
 * @date 2022/10/10/ 21:50
 * @since 1.0
 */
@Slf4j
@Operator
public class TjFileWriter extends AbstractWriter<AbstractNativeMsg> {

    private static final String DESCRIPTION = "[TJ定制 - 写MainFile版本] 日志按要求输出到本地文件中";
    private static final int DEFAULT_SUB_FILE_INTERVAL = 1;
    public static final String DEPLOY_CITY_CODE = "deploy.city.code";

    public static final String PROMETHEUS_MONITOR_ENABLE = "prometheus.monitor.enable";

    private static final AtomicBoolean INIT_ONCE = new AtomicBoolean(false);
    private static final AtomicLong TOTAL = new AtomicLong(0);
    private static final String DEFAULT_WORK_DIR = "file_log";
    private static final String DEFAULT_FILE_PREFIX = "debug";
    private static final String DEFAULT_FILE_SUFFIX = "";
    private static final String MSG_TYPE_JSON = "json";

    private static final String MSG_TYPE_FORMATTED_STRING = "formatted_string";

    private static final String ATTR_SELECTED_FIELDS = "selectedFields";
    private static final String ATTR_MSG_TYPE = "msgType";
    private static final String ATTR_WORK_DIR = "workDir";
    private static final String ATTR_SUB_FILE_TYPE = "subFileType";
    private static final String ATTR_FILE_PREFIX = "filePrefix";

    private static final String ATTR_FILE_SUFFIX = "fileSuffix";

    private static final String ATTR_SEPARATOR_NAME = "separator";
    private static final String ATTR_SUB_FILE_INTERVAL = "subFileInterval";
    private static final String ATTR_RAWS_LIMIT_PER_FILE = "rows.limit.per.file";

    /**
     * 全局配置
     */
    private static final String ATTR_SYNC = "sync";
    private static final String ATTR_THREAD_NUM = "threadNum";
    private static final String ATTR_BLOCK = "block";

    private static final AttrDescriptor SELECTED_FIELDS;
    private static final AttrDescriptor MSG_TYPE;
    private static final AttrDescriptor WORK_DIR;
    private static final AttrDescriptor FILE_PREFIX;
    private static final AttrDescriptor FILE_SUFFIX;
    private static final AttrDescriptor SUB_FILE_TYPE;
    private static final AttrDescriptor SYNC;
    private static final AttrDescriptor THREAD_NUM;
    private static final AttrDescriptor BLOCK;

    private static final AttrDescriptor SEPARATOR;
    private static final AttrDescriptor SUB_FILE_INTERVAL;
    private static final AttrDescriptor ROWS_LIMIT_PER_FILE;

    private static final List<AttrDescriptor> ATTR_DESCRIPTORS = new ArrayList<>();

    /**
     * 监控指标
     */

    private static final String DEFAULT_NAMESPACE = "flowpipe";


    static {
        SELECTED_FIELDS = AttrDescriptor.builder()
                .name(ATTR_SELECTED_FIELDS)
                .dataType(DataType.ARRAY)
                .description("选择输出的字段列表,默认输出全部")
                .required(Boolean.FALSE)
                .defaultValue(Collections.emptyList())
                .build();
        MSG_TYPE = AttrDescriptor.builder()
                .name(ATTR_MSG_TYPE)
                .dataType(DataType.ENUM)
                .description("输出消息格式,当前支持json和formatted_string")
                .required(Boolean.FALSE)
                .modifiable(Boolean.FALSE)
                .defaultValue(MSG_TYPE_JSON)
                .optional(Lists.newArrayList(MSG_TYPE_JSON, MSG_TYPE_FORMATTED_STRING))
                .build();
        SEPARATOR = AttrDescriptor.builder()
                .name(ATTR_SEPARATOR_NAME)
                .dataType(DataType.STRING)
                .description("分隔符,默认 |, 消息输出格式为formatted_string时有效")
                .required(Boolean.FALSE)
                .defaultValue("|")
                .build();
        FILE_PREFIX = AttrDescriptor.builder()
                .name(ATTR_FILE_PREFIX)
                .dataType(DataType.STRING)
                .description("文件名前缀")
                .required(Boolean.FALSE)
                .defaultValue(DEFAULT_FILE_PREFIX)
                .build();
        FILE_SUFFIX = AttrDescriptor.builder()
                .name(ATTR_FILE_SUFFIX)
                .dataType(DataType.STRING)
                .description("文件名后缀")
                .required(Boolean.FALSE)
                .defaultValue(DEFAULT_FILE_SUFFIX)
                .build();
        SUB_FILE_TYPE = AttrDescriptor.builder()
                .name(ATTR_SUB_FILE_TYPE)
                .dataType(DataType.ENUM)
                .description("文件切分类型,默认不切分文件")
                .required(Boolean.FALSE)
                .optional(Lists.newArrayList(
                        OperatorConstant.TIME_UNIT_DAY,
                        OperatorConstant.TIME_UNIT_HOUR,
                        OperatorConstant.TIME_UNIT_MINUTE,
                        OperatorConstant.TIME_UNIT_SECOND,
                        OperatorConstant.TIME_UNIT_NONE))
                .defaultValue(OperatorConstant.TIME_UNIT_NONE)
                .build();
        SUB_FILE_INTERVAL = AttrDescriptor.builder()
                .name(ATTR_SUB_FILE_INTERVAL)
                .dataType(DataType.INT)
                .description("文件切分间隔,默认为1,与切分类型配合使用")
                .required(Boolean.FALSE)
                .defaultValue(DEFAULT_SUB_FILE_INTERVAL)
                .build();
        WORK_DIR = AttrDescriptor.builder()
                .name(ATTR_WORK_DIR)
                .dataType(DataType.STRING)
                .description("文件输出目录")
                .required(Boolean.FALSE)
                .defaultValue(DEFAULT_WORK_DIR)
                .build();
        ROWS_LIMIT_PER_FILE = AttrDescriptor.builder()
                .name(ATTR_RAWS_LIMIT_PER_FILE)
                .dataType(DataType.INT)
                .description("单个文件的条数限制")
                .required(Boolean.FALSE)
                .build();


        SYNC = AttrDescriptor.builder()
                .name(ATTR_SYNC)
                .dataType(DataType.BOOL)
                .description("是否同步写入方式,默认为true")
                .required(Boolean.FALSE)
                .global(Boolean.TRUE)
                .modifiable(Boolean.FALSE)
                .defaultValue(Boolean.TRUE)
                .build();
        THREAD_NUM = AttrDescriptor.builder()
                .name(ATTR_THREAD_NUM)
                .dataType(DataType.INT)
                .description("异步写文件线程数")
                .required(Boolean.FALSE)
                .global(Boolean.TRUE)
                .defaultValue(1)
                .build();
        BLOCK = AttrDescriptor.builder()
                .name(ATTR_BLOCK)
                .dataType(DataType.BOOL)
                .description("写入是否阻塞")
                .required(Boolean.FALSE)
                .global(Boolean.TRUE)
                .modifiable(Boolean.FALSE)
                .defaultValue(Boolean.TRUE)
                .build();

        ATTR_DESCRIPTORS.add(SELECTED_FIELDS);
        ATTR_DESCRIPTORS.add(MSG_TYPE);
        ATTR_DESCRIPTORS.add(SEPARATOR);
        ATTR_DESCRIPTORS.add(FILE_PREFIX);
        ATTR_DESCRIPTORS.add(FILE_SUFFIX);
        ATTR_DESCRIPTORS.add(SUB_FILE_TYPE);
        ATTR_DESCRIPTORS.add(SUB_FILE_INTERVAL);
        ATTR_DESCRIPTORS.add(WORK_DIR);
        ATTR_DESCRIPTORS.add(ROWS_LIMIT_PER_FILE);

        ATTR_DESCRIPTORS.add(SYNC);
        ATTR_DESCRIPTORS.add(THREAD_NUM);
        ATTR_DESCRIPTORS.add(BLOCK);

    }

    @Override
    public String description() {
        return DESCRIPTION;
    }

    @Override
    public InputDescriptor inputDescriptor() {
        return OperatorConstant.BASE_INPUT;
    }

    @Override
    public OutputDescriptor outputDescriptor() {
        return OperatorConstant.BASE_OUTPUT;
    }

    @Override
    public MetaDescriptor metaDescriptor() {
        return OperatorConstant.DEFAULT_META;
    }

    @Override
    public List<AttrDescriptor> attrDescriptors() {
        return ATTR_DESCRIPTORS;
    }


    private static final ExecutorService FILE_CLOSE_WORKER = new ThreadPoolExecutor(
            2,
            2,
            1L,
            TimeUnit.MINUTES,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory("file-close-worker")
    );


    /**
     * 局部属性
     */
    private List<String> selectedFields;
    private String filePrefix;
    private String fileSuffix;
    private String msgType;
    private String subFileType;
    private String workDir;
    private boolean shutdown;

    /**
     * 全局属性
     */
    private static Boolean attrSync;
    private static Boolean attrBlock = false;
    private static Integer threadNum;

    private static List<WriteWorker> writeWorks;
    private static WriteWorker syncWriteWorker;
    private static List<MpscArrayQueue<FileWriterItem>> queues;
    private static ExecutorService executor;

    private String separator;

    private Integer subFileInterval;
    private Integer rowsPerFile;

    private String deployCityCode;

    private Counter counterBCPCreate;

    private Counter counterBCPClose;

    private Counter counterMainFileCreate;

    private Counter counterMainFileClose;

    private Counter counterBCPBytes;

    private Counter counterMainFileBytes;

    private static final Map<String, Counter> COUNTER_ROSTER = new ConcurrentHashMap<>();


    @Override
    protected void init0() {
        selectedFields = Optional.ofNullable(attrsDefine.getStringList(ATTR_SELECTED_FIELDS))
                .orElse(Collections.emptyList());
        filePrefix = attrsDefine.getStringOrDefault(ATTR_FILE_PREFIX, DEFAULT_FILE_PREFIX);
        fileSuffix = attrsDefine.getStringOrDefault(ATTR_FILE_SUFFIX, DEFAULT_FILE_SUFFIX);
        msgType = attrsDefine.getStringOrDefault(ATTR_MSG_TYPE, MSG_TYPE_JSON);
        subFileType = attrsDefine.getStringOrDefault(ATTR_SUB_FILE_TYPE, "none");
        // 校验当前目录是否存在,如果不存在则进行创建
        workDir = attrsDefine.getStringOrDefault(ATTR_WORK_DIR, DEFAULT_WORK_DIR);
        if (Objects.requireNonNull(attrsDefine.getString(ATTR_MSG_TYPE)).equals(MSG_TYPE_FORMATTED_STRING)) {
            separator = StringEscapeUtils.unescapeJava(Optional.ofNullable(attrsDefine.getString(ATTR_SEPARATOR_NAME))
                    .orElseThrow(() -> new IllegalArgumentException("separator参数缺失")));
        }
        checkForCreateDir(workDir);

        subFileInterval = attrsDefine.getIntOrDefault(ATTR_SUB_FILE_INTERVAL, DEFAULT_SUB_FILE_INTERVAL);
        rowsPerFile = attrsDefine.getIntOrDefault(ATTR_RAWS_LIMIT_PER_FILE, 0);

        //bcpcount
        String counterKeyBCPCreate = getCounterKey(DEFAULT_NAMESPACE, filePrefix + "_bcp_create");
        String counterKeyBCPClose = getCounterKey(DEFAULT_NAMESPACE, filePrefix + "_bcp_close");
        String counterKeyMainFileCreate = getCounterKey(DEFAULT_NAMESPACE, filePrefix + "_mainfile_create");
        String counterKeyMainFileClose = getCounterKey(DEFAULT_NAMESPACE, filePrefix + "_mainfile_close");
        String counterKeyBCP = getCounterKey(DEFAULT_NAMESPACE,filePrefix + "_bcp_bytes");
        String counterKeyMainFile = getCounterKey(DEFAULT_NAMESPACE,filePrefix + "_mainfile_bytes");

        counterBCPCreate = COUNTER_ROSTER.computeIfAbsent(counterKeyBCPCreate, s -> {
           Counter.Builder builder = Counter.build()
                   .namespace(DEFAULT_NAMESPACE)
                   .name(filePrefix + "_bcp_create")
                   .help(counterKeyBCPCreate);
           return builder.register();
        });
        counterBCPClose = COUNTER_ROSTER.computeIfAbsent(counterKeyBCPClose, s -> {
           Counter.Builder builder = Counter.build()
                   .namespace(DEFAULT_NAMESPACE)
                   .name(filePrefix + "_bcp_close")
                   .help(counterKeyBCPClose);
           return builder.register();
        });
        counterMainFileCreate = COUNTER_ROSTER.computeIfAbsent(counterKeyMainFileCreate, s -> {
            Counter.Builder builder = Counter.build()
                    .namespace(DEFAULT_NAMESPACE)
                    .name(filePrefix + "_mainfile_create")
                    .help(counterKeyMainFileCreate);
            return builder.register();
        });
        counterMainFileClose = COUNTER_ROSTER.computeIfAbsent(counterKeyMainFileClose, s -> {
            Counter.Builder builder = Counter.build()
                    .namespace(DEFAULT_NAMESPACE)
                    .name(filePrefix + "_mainfile_close")
                    .help(counterKeyMainFileClose);
            return builder.register();
        });
        counterBCPBytes = COUNTER_ROSTER.computeIfAbsent(counterKeyBCP, s -> {
            Counter.Builder builder =  Counter.build()
                    .namespace(DEFAULT_NAMESPACE)
                    .name(filePrefix + "_bcp_bytes")
                    .help(counterKeyBCP);
            return builder.register();
        });
        counterMainFileBytes = COUNTER_ROSTER.computeIfAbsent(counterKeyMainFile, s -> {
                Counter.Builder builder = Counter.build()
                    .namespace(DEFAULT_NAMESPACE)
                    .name(filePrefix + "_mainfile_bytes")
                    .help(counterKeyMainFile)
                        .labelNames("method")    ;
            return builder.register();
        });

        initOnce();

    }


    /**
     * 执行只初始化一次的任务
     */
    private synchronized void initOnce() {
        if (INIT_ONCE.compareAndSet(false, true)) {
            //非同步的方式需要创建线程来进行数据写入
            attrSync = Optional.ofNullable(attrsDefine.getBoolean(ATTR_SYNC)).orElse(Boolean.FALSE);
            if (attrSync.equals(Boolean.FALSE)) {
                threadNum = Math.max(Optional.ofNullable(attrsDefine.getInt(ATTR_THREAD_NUM)).orElse(1), 1);
                attrBlock = Optional.ofNullable(attrsDefine.getBoolean(ATTR_BLOCK)).orElse(Boolean.FALSE);
                // 根据threadNum 初始化线程
                executor = new ThreadPoolExecutor(threadNum, threadNum, 30L, TimeUnit.SECONDS,
                        new SynchronousQueue<>(), new NamedThreadFactory("file-write-worker"));
                queues = new ArrayList<>(threadNum);
                writeWorks = new ArrayList<>(threadNum);
                for (int i = 0; i < threadNum; i++) {
                    final MpscArrayQueue<FileWriterItem> queue = new MpscArrayQueue<>(10240);
                    queues.add(queue);
                    WriteWorker writeWorker = new WriteWorker(queue, i);
                    writeWorks.add(writeWorker);
                    executor.execute(writeWorker);
                }
            } else {
                syncWriteWorker = new WriteWorker(null, 0);
            }
        }
    }

    private String getCounterKey(String namespace, String name) {
        return new StringJoiner("_")
                .add(namespace)
                .add(name)
                .toString();
    }

    private String getGaugeKey(String namespace, String name) {
        return new StringJoiner("_")
                .add(namespace)
                .add(name)
                .toString();
    }


    @Override
    public void write(AbstractNativeMsg msg) {
        if (shutdown) {
            return;
        }
        FileWriterItem writerItem = genWriteItem(msg);
        if (Boolean.TRUE.equals(attrSync)) {
            writeSync(writerItem);
        } else {
            writeAsync(writerItem);
        }
    }


    private void writeSync(FileWriterItem writerItem) {
        // 使用当前线程写入
        syncWriteWorker.tryWrite2File(writerItem);
    }


    private void writeAsync(FileWriterItem writerItem) {
        int sleep = 0;
        long total = TOTAL.getAndIncrement();
        int index = (int) (total % threadNum);
        final MpscArrayQueue<FileWriterItem> queue = queues.get(index);
        // 临时打印 == > 修改时间自适应打印。
        if ((total % (100_000) == 0)) {
            log.info("===>>> FileWriter消费队列中元素个数为:{}", Joiner.on(", ").join(queues.stream().map(Queue::size).collect(Collectors.toList())));
        }
        try {
            // block 为true 代表不能丢消息,需要阻塞等待队列有容量能够放入当前消息
            while (!queue.offer(writerItem) && Boolean.TRUE.equals(attrBlock)) {
                Thread.sleep(Math.min(++sleep, 8));
            }
        } catch (InterruptedException e) {
            exceptionHandle(e);
        }
    }


    private void exceptionHandle(InterruptedException e) {
        // 遇到中断异常,需要关闭当前写线程
        e.printStackTrace();
    }


    private FileWriterItem genWriteItem(AbstractNativeMsg msg) {
        FileWriterItem writerItem = new FileWriterItem();
        // 但是使用的是一份局部拷贝的新对象
        writerItem.setMsg(msg.partialClone());
        writerItem.setFilePrefix(filePrefix);
        writerItem.setFileSuffix(fileSuffix);
        writerItem.setSubFileType(subFileType);
        writerItem.setWorkDir(workDir);
        writerItem.setSelectedFields(selectedFields);
        writerItem.setMsgType(msgType);
        writerItem.setSep(this.separator);
        writerItem.setSubFileInterval(this.subFileInterval);
        writerItem.setRowsPerFile(this.rowsPerFile);

        //monitor
        writerItem.setCounterBCPCreate(counterBCPCreate);
        writerItem.setCounterBCPClose(counterBCPClose);
        writerItem.setCounterMainFileCreate(counterMainFileCreate);
        writerItem.setCounterMainFileClose(counterMainFileClose);
        writerItem.setCounterBCPBytes(counterBCPBytes);
        writerItem.setCounterMainFileBytes(counterMainFileBytes);

        return writerItem;
    }


    /**
     * 检查或创建文件夹
     */
    private synchronized void checkForCreateDir(String dirName) {
        File file = Paths.get(dirName).toFile();
        if (!file.exists()) {
            file.mkdirs();
        }
    }


    @Override
    public void shutdown() {
        shutdown = true;
        if (writeWorks != null && !writeWorks.isEmpty()) {
            writeWorks.forEach(WriteWorker::shutdown);
        }
        if (executor != null && !executor.isShutdown()) {
            executor.shutdown();
        }
    }


    //******************************************************************************************************************//
    //********************************************   WRITE  THREAD  START   ********************************************//
    //******************************************************************************************************************//

    /**
     * debug 日志写线程
     */
    private static class WriteWorker implements Runnable, Closeable {

        private static final int WORKER_STATUS_NEW = -1;
        private static final int WORKER_STATUS_RUNNING = 0;
        private static final int WORKER_STATUS_EXIT = 1;
        private static final int WAITING_MAX_TIMES = 1000;

        public static final String RAW_FILE_FILED = "_RAW_FILE_FIELD";
        public static final String RAW_FILE_ENABLE = "_RAW_FILE_ENABLE";
        public static final String RAW1_ENABLE = "_RAW1_ENABLE";
        public static final String RAW2_ENABLE = "_RAW2_ENABLE";
        public static final String RAW1_BASE64_ENABLE = "_RAW1_BASE64_ENABLE";
        public static final String RAW2_BASE64_ENABLE = "_RAW2_BASE64_ENABLE";
        private static final String TWO_LINE_SEP = "\n\n";
        private static final String FILE_NAME_SEPARATOR = "-";

        private static final ThreadLocal<StringBuilder> STRING_BUILDER_THREAD_LOCAL = ThreadLocal.withInitial(StringBuilder::new);
        private AtomicBoolean shutdown = new AtomicBoolean(false);
        private final MpscArrayQueue<FileWriterItem> queue;
        private final int index;

        private int status;

        private final AtomicLong fileCount = new AtomicLong(0);
        private final MultiKeyMap<String, WriterWrapper> writerMap = new MultiKeyMap<>();
        private final AtomicLong mainFileCount = new AtomicLong(0);

        private final String deployCityCode;

        private final Boolean prometheusEnable = Optional.ofNullable(Env.INSTANCE.getBoolean(PROMETHEUS_MONITOR_ENABLE)).orElse(false);



        public WriteWorker(MpscArrayQueue<FileWriterItem> queue, int index) {
            this.queue = queue;
            this.index = index;
            this.status = WORKER_STATUS_NEW;
            // 获取不成部署的地市区域编码
            this.deployCityCode = Optional.ofNullable(Env.INSTANCE.getString(DEPLOY_CITY_CODE))
                    .orElse("null");
        }

        @Override
        public void run() {
            int sleep = 1;
            status = WORKER_STATUS_RUNNING;
            while (!shutdown.get() && !Thread.interrupted()) {
                try {
                    boolean success = queue.drain(this::tryWrite2File) > 0;
                    if (!success && queue.isEmpty()) {
                        Thread.sleep(Math.min(sleep++, 32));
                    } else {
                        sleep = 1;
                    }
                } catch (Throwable e) {
                    log.error("write thread error: ", e);
                    // todo 异常处理,挂掉当前线程。
                }
            }
            close();
            status = WORKER_STATUS_EXIT;
            log.info("FileWrite Thread exit, stop:{}, interrupted:{}", shutdown, Thread.currentThread().isInterrupted());
        }


        @Override
        public void close() {
            try {
                // 如果 queue 中还有数据,尝试将剩余的数据写入到文件中
                log.info("{}开始执行关闭逻辑", Thread.currentThread().getName());
                if (!queue.isEmpty()) {
                    log.info("file-write-worker-[{}]缓存队列非空,剩余日志数:[{}], 尝试将队列中数据写入文件", index, queue.size());
                    queue.drain(this::tryWrite2File);
                }
                Iterator<Map.Entry<MultiKey<? extends String>, WriterWrapper>> iterator = writerMap.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry<MultiKey<? extends String>, WriterWrapper> next = iterator.next();
                    WriterWrapper wrapper = next.getValue();
                    try {
                        // 关闭时使用同步方式进行关闭
                        wrapper.close();
                        if(prometheusEnable){
                            wrapper.getCounterBCPClose().inc();
                            wrapper.getCounterBCPBytes().inc(wrapper.getFile().length());
                        }
                        // 关闭后将元素移除
                        iterator.remove();
                    } catch (Throwable e) {
                        log.error("进程关闭,关闭文件[{}]出现异常:", wrapper.getFile().getAbsolutePath(), e);
                    }
                }
            } catch (Exception e) {
                log.info("{}关闭异常:", Thread.currentThread().getName(), e);
            }
        }


        public void shutdown() {
            if (shutdown.compareAndSet(false, true)) {
                int times = 0;
                try {
                    // 等待worker 线程正常退出或者达到最大等待时长
                    while (status != WORKER_STATUS_EXIT || times >= WAITING_MAX_TIMES) {
                        TimeUnit.MILLISECONDS.sleep(10);
                        times++;
                    }
                } catch (Exception e) {
                    log.info("file-write-worker-{}关闭异常:", index, e);
                }
            }
        }


        /**
         * 执行写文件逻辑
         */
        private void tryWrite2File(FileWriterItem writerItem) {
            // TODO 待优化 当前依靠新的日志来触发文件切换的条件
            WriterWrapper wrapper = getWriterOrCreate(writerItem);
            if (wrapper != null) {
                // 如果被重置了,需要再次获取wrapper
                if (resetWriterIfNeed(wrapper)) {
                    wrapper = getWriterOrCreate(writerItem);
                    if (wrapper == null) {
                        return;
                    }
                }
                try {
                    // 处理原始码流文件
                    dealWithRawFile(writerItem, wrapper);
                    // 写入bcp文件
                    String writeMsg = genWriteMsg(writerItem);
                    if (writeMsg != null) {
                        wrapper.writeRow(writeMsg);
                        // 暂时不需要重置 MAINFILE 字段,当前msg对象是一份拷贝,新增字段不会影响原对象
                    }
                } catch (IOException e) {
                    log.error("写入内容异常, fileName:[{}], error", wrapper.getFile().getAbsolutePath(), e);
                }
            }
        }

        /**
         * 生成mainfile
         *
         * @param writerItem
         * @param wrapper
         */
        private void dealWithRawFile(FileWriterItem writerItem, WriterWrapper wrapper) {
            AbstractNativeMsg msg = writerItem.getMsg();
            Boolean rawFileEnable = (Boolean) msg.get(RAW_FILE_ENABLE);
            if (rawFileEnable == null || !rawFileEnable) {
                return;
            }
            Boolean raw1Enable = (Boolean) msg.get(RAW1_ENABLE);
            Boolean raw2Enable = (Boolean) msg.get(RAW2_ENABLE);
            Boolean raw1Base64Enable = (Boolean) msg.get(RAW1_BASE64_ENABLE);
            Boolean raw2Base64Enable = (Boolean) msg.get(RAW2_BASE64_ENABLE);
            // 至少需要开启一个选项才进行写入
            if ((raw1Enable == null || !raw1Enable) && (raw2Enable == null || !raw2Enable) && (raw1Base64Enable == null || !raw1Base64Enable) && (raw2Base64Enable == null || !raw2Base64Enable)) {
                return;
            }
            //打点记一次
            if (prometheusEnable){
                wrapper.getCounterMainFileCreate().inc();
            }
            byte[] raw1 = (byte[]) writerItem.getMsg().get(FixedFieldDict.RAW_1);
            byte[] raw2 = (byte[]) writerItem.getMsg().get(FixedFieldDict.RAW_2);
            String rawFileField = (String) writerItem.getMsg().get(RAW_FILE_FILED);
            // 原始码流都不存在的情况下也不需要写mainfile
            // 如果已经填了mainfile,则需要处理(复制文件到正确的地方)
            if (!rawDataEnable(raw1) && !rawDataEnable(raw2) && (StringUtils.isEmpty(rawFileField) || StringUtils.isEmpty(writerItem.getMsg().getString(rawFileField)))) {
                return;
            }
            try2HandleMainFile(writerItem, wrapper, raw1Enable, raw2Enable, raw1Base64Enable, raw2Base64Enable, raw1, raw2);
        }


        private void try2HandleMainFile(FileWriterItem writerItem,
                                        WriterWrapper wrapper,
                                        Boolean raw1Enable,
                                        Boolean raw2Enable,
                                        Boolean raw1Base64Enable,
                                        Boolean raw2Base64Enable,
                                        byte[] raw1,
                                        byte[] raw2) {
            // 当前 mainfile 跟 bcp  文件在同一目录下,去除.bcp后缀的文件名作为rawFile的目录
            String filePath = wrapper.getFile().getAbsolutePath();
            int indexOf = filePath.lastIndexOf(wrapper.getFileSuffix());
            String mainFilePath = filePath.substring(0, indexOf);
            File mainFileDir = Paths.get(mainFilePath).toFile();
            if (!mainFileDir.exists()) {
                mainFileDir.mkdirs();
            }
            String mainFileName = genMainFileName(wrapper);
            File mainFile = Paths.get(mainFileDir.getAbsolutePath(), mainFileName).toFile();
            String rawFileField = (String) writerItem.getMsg().get(RAW_FILE_FILED);
            if (rawFileField != null) {
                try {
                    String existMainFilePath = (String) writerItem.getMsg().get(rawFileField);
                    if (StringUtils.isNotEmpty(existMainFilePath)) {
                        File f = Paths.get(existMainFilePath).toFile();
                        if (f.exists()) {
                            String existMainFileName = f.getName();
                            String existMainFileSuffix = existMainFileName.indexOf(".") > 0 ? existMainFileName.substring(existMainFileName.indexOf(".")) : "";
                            String targetMainFileName = mainFileName.indexOf(".") > 0 ? mainFileName.substring(0, mainFileName.indexOf(".")) : mainFileName;
                            mainFile = Paths.get(mainFileDir.getAbsolutePath(), targetMainFileName + existMainFileSuffix).toFile();
                            FileUtil.copy(f, mainFile, true);
                            //mainfile复制了一次,打点
                            if (prometheusEnable){
                                wrapper.getCounterMainFileClose().inc();
                                wrapper.getCounterMainFileBytes().labels("COPY").inc(mainFile.length());
                            }
                            if (mainFile.exists()) {
                                writerItem.getMsg().put(rawFileField, mainFile.getAbsolutePath());
                            }
                        } else {
                            log.error("rawFileField:{} has content:{} but no file exist", rawFileField, existMainFilePath);
                            writerItem.getMsg().put(rawFileField, null);
                        }
                        return;
                    } else {
                        try (java.io.FileWriter mainFileWriter = new FileWriter(mainFile)) {
                            //加了一个counter
                            writeMainFileContent(mainFileWriter, raw1Enable, raw2Enable, raw1Base64Enable, raw2Base64Enable, raw1, raw2);
                            mainFileWriter.flush();
                        }
                        //mainfile写了一个,打点
                        if (prometheusEnable){
                            wrapper.getCounterMainFileClose().inc();
                            wrapper.getCounterMainFileBytes().labels("WRITE").inc(mainFile.length());
                        }
                    }
                    // 进行 原始码流文件字段 赋值
                    writerItem.getMsg().put(rawFileField, mainFile.getAbsolutePath());
                } catch (IOException e) {
                    log.error("写MAINFILE异常,filePrefix:[{}], 异常:", writerItem.getFilePrefix(), e);
                }
            }
        }

        private void writeMainFileContent(FileWriter fileWriter,
                                          Boolean raw1Enable,
                                          Boolean raw2Enable,
                                          Boolean raw1Base64Enable,
                                          Boolean raw2Base64Enable,
                                          byte[] raw1,
                                          byte[] raw2) throws IOException {
            if (rawDataEnable(raw1)) {
                if (raw1Enable != null && raw1Enable) {
                    fileWriter.write(new String(raw1));
                    fileWriter.write(TWO_LINE_SEP);
                }
                if (raw1Base64Enable != null && raw1Base64Enable) {
                    String raw1Base64 = Base64.getEncoder().encodeToString(raw1);
                    fileWriter.write(raw1Base64);
                    fileWriter.write(TWO_LINE_SEP);
                }
            }
            if (rawDataEnable(raw2)) {
                if (raw2Enable != null && raw2Enable) {
                    fileWriter.write(new String(raw2));
                    fileWriter.write(TWO_LINE_SEP);
                }
                if (raw2Base64Enable != null && raw2Base64Enable) {
                    String raw2Base64 = Base64.getEncoder().encodeToString(raw2);
                    fileWriter.write(raw2Base64);
                    fileWriter.write(TWO_LINE_SEP);
                }
            }
        }


        private boolean rawDataEnable(byte[] raw) {
            return raw != null && raw.length > 0;
        }


        private String genMainFileName(WriterWrapper wrapper) {
            return new StringBuilder()
                    .append(wrapper.getFilePrefix()).append("_raw_")
                    .append(index).append("_")
                    .append(mainFileCount.incrementAndGet()).append("_")
                    .append(System.currentTimeMillis())
                    .append(".txt")
                    .toString();
        }


        private WriterWrapper getWriterOrCreate(FileWriterItem writerItem) {
            // 根据前缀和后缀获取,如果没有则新建
            WriterWrapper wrapper = writerMap.get(writerItem.getFilePrefix(), writerItem.getFileSuffix());
            if (wrapper == null) {
                try {
                    //wrapper = WriterWrapper.from(writerItem, index, fileCount.getAndIncrement(), true);
                    String fileNameForTJ = genFileNameForTJ(writerItem);
                    wrapper = WriterWrapper.from(writerItem, fileNameForTJ, true);
                    //新建了bcp文件,打点
                    if (prometheusEnable){
                        writerItem.getCounterBCPCreate().inc();
                        writerMap.put(writerItem.getFilePrefix(), writerItem.getFileSuffix(), wrapper);
                    }
                } catch (IOException e) {
                    log.error("获取writerWrapper异常,data:[{}], error:", writerItem, e);
                }
            }
            return wrapper;
        }


        private boolean resetWriterIfNeed(WriterWrapper wrapper) {
            if (wrapper.isExpire()) {
                // rest the cached map
                writerMap.put(wrapper.getFilePrefix(), wrapper.getFileSuffix(), null);
                tryFlush(wrapper);
                return true;
            }
            return false;
        }


        private void tryFlush(WriterWrapper wrapper) {
            if (wrapper != null && wrapper.getWriter() != null) {
                FILE_CLOSE_WORKER.execute(() -> {
                    try {
                        wrapper.close();
                        //同步消息
                        if (prometheusEnable){
                            wrapper.getCounterBCPClose().inc();
                            wrapper.getCounterBCPBytes().inc(wrapper.getFile().length());
                        }
                    } catch (IOException e) {
                        log.error("try flush file close writer error: ", e);
                    }
                });
            }
        }


        private String genWriteMsg(FileWriterItem writerItem) {
            String msgType = writerItem.getMsgType();
            switch (msgType) {
                case MSG_TYPE_JSON: {
                    return jsonTypeMsg(writerItem);
                }
                case MSG_TYPE_FORMATTED_STRING: {
                    return formattedTypeMsg(writerItem);
                }
                default:
                    throw new UnsupportedOperationException();
            }
        }

        private String formattedTypeMsg(FileWriterItem writerItem) {
            List<String> selectedFields = writerItem.getSelectedFields();
            if (selectedFields.isEmpty()) {
                return null;
            }
            final AbstractNativeMsg msg = writerItem.getMsg();
            StringBuilder stringBuilder = STRING_BUILDER_THREAD_LOCAL.get();
            stringBuilder.setLength(0);
            for (int i = 0; i < selectedFields.size() - 1; i++) {
                final String name = selectedFields.get(i);
                final Object value = msg.get(name);
                String s = replaceSensitiveChars(value, writerItem);
                stringBuilder.append(s).append(writerItem.getSep());
            }
            final Object finalValue = msg.get(selectedFields.get(selectedFields.size() - 1));
            String s = replaceSensitiveChars(finalValue, writerItem);
            stringBuilder.append(s);
            return stringBuilder.toString();
        }

        private String replaceSensitiveChars(Object object, FileWriterItem writerItem) {
            if (object == null) {
                return "";
            }
            String part = Objects.toString(object);
//            if (part.contains(writerItem.getSep())) {
//                part = part.replaceAll(writerItem.getSep(), URLUtil.encode(writerItem.getSep()));
//            }
            if (part.contains("\t")) {
                part = part.replaceAll("\t", "\\\\t");
            }
            if (part.contains("\r")) {
                part = part.replaceAll("\r", "\\\\r");
            }
            if (part.contains("\n")) {
                part = part.replaceAll("\n", "\\\\n");
            }
            return part;
        }


        private String jsonTypeMsg(FileWriterItem writerItem) {
            List<String> selectedFields = writerItem.getSelectedFields();
            AbstractNativeMsg msg = writerItem.getMsg();
            JSONObject logItem = new JSONObject(selectedFields.size());
            for (String selectedField : selectedFields) {
                logItem.put(selectedField, msg.get(selectedField));
            }
            return logItem.toJSONString();
        }


        /**
         * 根据TJ的规范生成fileName
         * 生成规则:
         * <pre>
         *     第一个固定111,第二个是地市区域编码,第三个是生成时间戳,第四个是随机数,第五个是大写类型名,第六个0.xx1_bq保留不动
         *     示例:111-511100-1620575830-06580-DATA_IM-0.xx1_bq.bcp
         * </pre>
         *
         * @return
         */
        private String genFileNameForTJ(FileWriterItem writerItem) {
            return new StringBuilder("111").append(FILE_NAME_SEPARATOR)
                    .append(deployCityCode).append(FILE_NAME_SEPARATOR)
                    .append(System.currentTimeMillis() / 1000).append(FILE_NAME_SEPARATOR)
                    .append(index).append("0").append(fileCount.incrementAndGet()).append(FILE_NAME_SEPARATOR)
                    .append(writerItem.getFilePrefix().toUpperCase()).append(FILE_NAME_SEPARATOR)
                    .append("xx1_bq")
                    .append(writerItem.getFileSuffix())
                    .toString();
        }
    }

    //******************************************************************************************************************//
    //********************************************   WRITE  THREAD   END    ********************************************//
    //******************************************************************************************************************//


}

这里面生成了一个静态内部类,用非阻塞、异步的方式处理IO。