这几天通过在公司看代码,发现很多设计模式的使用,这里进行一下总结:
- 单例模式
- 饿汉
public class Lubenwei { private static final Lubenwei lbw = new Lubenwei(); private Lubenwei(){} public static Lubenwei getLbw(){ return lbw; } }
- 懒汉
public class Lubenwei { private static Lubenwei lbw; private Lubenwei(){} public static synchronized Lubenwei getLbw(){ if (lbw == null){ lbw = new Lubenwei(); } return lbw; } }
- 双重检测
public class Lubenwei { private volatile static Lubenwei lubenwei; private Lubenwei(){} public static Lubenwei getLubenwei(){ if (lubenwei == null){ synchronized (Lubenwei.class){ if (lubenwei == null){ lubenwei = new Lubenwei(); } } } return lubenwei; } }
- 枚举(请使用这个)
说明一下,项目里用到的单例,全部都是这种模式,不仅能避免多线程同步问题,还自动支持序列化机制。
public enum Lubenwei { lubenwei; //该对象全局唯一 }
- 饿汉
生产者-消费者模型
通过这几天的观察,这个主要用于异步消息写入或者服务间解耦,主要是异步。
package com.flowpp.pipe.operator.impl.writer.file;
import cn.hutool.core.io.FileUtil;
import com.alibaba.fastjson.JSONObject;
import com.flowpp.pipe.api.annotation.Operator;
import com.flowpp.pipe.api.decriptor.AttrDescriptor;
import com.flowpp.pipe.api.decriptor.InputDescriptor;
import com.flowpp.pipe.api.decriptor.MetaDescriptor;
import com.flowpp.pipe.api.decriptor.OutputDescriptor;
import com.flowpp.pipe.api.env.Env;
import com.flowpp.pipe.api.operator.define.writer.AbstractWriter;
import com.flowpp.pipe.api.utils.NamedThreadFactory;
import com.flowpp.pipe.common.constant.DataType;
import com.flowpp.pipe.operator.constant.OperatorConstant;
import com.flowpp.pipe.record.constant.FixedFieldDict;
import com.flowpp.pipe.record.message.AbstractNativeMsg;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue;
import io.prometheus.client.Gauge;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.keyvalue.MultiKey;
import org.apache.commons.collections4.map.MultiKeyMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringEscapeUtils;
import java.io.Closeable;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import io.prometheus.client.Counter;
/**
* FIXME TJ定制的输出算子,临时解决办法
*
* @author XFS
* @version 1.0
* @date 2022/10/10/ 21:50
* @since 1.0
*/
@Slf4j
@Operator
public class TjFileWriter extends AbstractWriter<AbstractNativeMsg> {
private static final String DESCRIPTION = "[TJ定制 - 写MainFile版本] 日志按要求输出到本地文件中";
private static final int DEFAULT_SUB_FILE_INTERVAL = 1;
public static final String DEPLOY_CITY_CODE = "deploy.city.code";
public static final String PROMETHEUS_MONITOR_ENABLE = "prometheus.monitor.enable";
private static final AtomicBoolean INIT_ONCE = new AtomicBoolean(false);
private static final AtomicLong TOTAL = new AtomicLong(0);
private static final String DEFAULT_WORK_DIR = "file_log";
private static final String DEFAULT_FILE_PREFIX = "debug";
private static final String DEFAULT_FILE_SUFFIX = "";
private static final String MSG_TYPE_JSON = "json";
private static final String MSG_TYPE_FORMATTED_STRING = "formatted_string";
private static final String ATTR_SELECTED_FIELDS = "selectedFields";
private static final String ATTR_MSG_TYPE = "msgType";
private static final String ATTR_WORK_DIR = "workDir";
private static final String ATTR_SUB_FILE_TYPE = "subFileType";
private static final String ATTR_FILE_PREFIX = "filePrefix";
private static final String ATTR_FILE_SUFFIX = "fileSuffix";
private static final String ATTR_SEPARATOR_NAME = "separator";
private static final String ATTR_SUB_FILE_INTERVAL = "subFileInterval";
private static final String ATTR_RAWS_LIMIT_PER_FILE = "rows.limit.per.file";
/**
* 全局配置
*/
private static final String ATTR_SYNC = "sync";
private static final String ATTR_THREAD_NUM = "threadNum";
private static final String ATTR_BLOCK = "block";
private static final AttrDescriptor SELECTED_FIELDS;
private static final AttrDescriptor MSG_TYPE;
private static final AttrDescriptor WORK_DIR;
private static final AttrDescriptor FILE_PREFIX;
private static final AttrDescriptor FILE_SUFFIX;
private static final AttrDescriptor SUB_FILE_TYPE;
private static final AttrDescriptor SYNC;
private static final AttrDescriptor THREAD_NUM;
private static final AttrDescriptor BLOCK;
private static final AttrDescriptor SEPARATOR;
private static final AttrDescriptor SUB_FILE_INTERVAL;
private static final AttrDescriptor ROWS_LIMIT_PER_FILE;
private static final List<AttrDescriptor> ATTR_DESCRIPTORS = new ArrayList<>();
/**
* 监控指标
*/
private static final String DEFAULT_NAMESPACE = "flowpipe";
static {
SELECTED_FIELDS = AttrDescriptor.builder()
.name(ATTR_SELECTED_FIELDS)
.dataType(DataType.ARRAY)
.description("选择输出的字段列表,默认输出全部")
.required(Boolean.FALSE)
.defaultValue(Collections.emptyList())
.build();
MSG_TYPE = AttrDescriptor.builder()
.name(ATTR_MSG_TYPE)
.dataType(DataType.ENUM)
.description("输出消息格式,当前支持json和formatted_string")
.required(Boolean.FALSE)
.modifiable(Boolean.FALSE)
.defaultValue(MSG_TYPE_JSON)
.optional(Lists.newArrayList(MSG_TYPE_JSON, MSG_TYPE_FORMATTED_STRING))
.build();
SEPARATOR = AttrDescriptor.builder()
.name(ATTR_SEPARATOR_NAME)
.dataType(DataType.STRING)
.description("分隔符,默认 |, 消息输出格式为formatted_string时有效")
.required(Boolean.FALSE)
.defaultValue("|")
.build();
FILE_PREFIX = AttrDescriptor.builder()
.name(ATTR_FILE_PREFIX)
.dataType(DataType.STRING)
.description("文件名前缀")
.required(Boolean.FALSE)
.defaultValue(DEFAULT_FILE_PREFIX)
.build();
FILE_SUFFIX = AttrDescriptor.builder()
.name(ATTR_FILE_SUFFIX)
.dataType(DataType.STRING)
.description("文件名后缀")
.required(Boolean.FALSE)
.defaultValue(DEFAULT_FILE_SUFFIX)
.build();
SUB_FILE_TYPE = AttrDescriptor.builder()
.name(ATTR_SUB_FILE_TYPE)
.dataType(DataType.ENUM)
.description("文件切分类型,默认不切分文件")
.required(Boolean.FALSE)
.optional(Lists.newArrayList(
OperatorConstant.TIME_UNIT_DAY,
OperatorConstant.TIME_UNIT_HOUR,
OperatorConstant.TIME_UNIT_MINUTE,
OperatorConstant.TIME_UNIT_SECOND,
OperatorConstant.TIME_UNIT_NONE))
.defaultValue(OperatorConstant.TIME_UNIT_NONE)
.build();
SUB_FILE_INTERVAL = AttrDescriptor.builder()
.name(ATTR_SUB_FILE_INTERVAL)
.dataType(DataType.INT)
.description("文件切分间隔,默认为1,与切分类型配合使用")
.required(Boolean.FALSE)
.defaultValue(DEFAULT_SUB_FILE_INTERVAL)
.build();
WORK_DIR = AttrDescriptor.builder()
.name(ATTR_WORK_DIR)
.dataType(DataType.STRING)
.description("文件输出目录")
.required(Boolean.FALSE)
.defaultValue(DEFAULT_WORK_DIR)
.build();
ROWS_LIMIT_PER_FILE = AttrDescriptor.builder()
.name(ATTR_RAWS_LIMIT_PER_FILE)
.dataType(DataType.INT)
.description("单个文件的条数限制")
.required(Boolean.FALSE)
.build();
SYNC = AttrDescriptor.builder()
.name(ATTR_SYNC)
.dataType(DataType.BOOL)
.description("是否同步写入方式,默认为true")
.required(Boolean.FALSE)
.global(Boolean.TRUE)
.modifiable(Boolean.FALSE)
.defaultValue(Boolean.TRUE)
.build();
THREAD_NUM = AttrDescriptor.builder()
.name(ATTR_THREAD_NUM)
.dataType(DataType.INT)
.description("异步写文件线程数")
.required(Boolean.FALSE)
.global(Boolean.TRUE)
.defaultValue(1)
.build();
BLOCK = AttrDescriptor.builder()
.name(ATTR_BLOCK)
.dataType(DataType.BOOL)
.description("写入是否阻塞")
.required(Boolean.FALSE)
.global(Boolean.TRUE)
.modifiable(Boolean.FALSE)
.defaultValue(Boolean.TRUE)
.build();
ATTR_DESCRIPTORS.add(SELECTED_FIELDS);
ATTR_DESCRIPTORS.add(MSG_TYPE);
ATTR_DESCRIPTORS.add(SEPARATOR);
ATTR_DESCRIPTORS.add(FILE_PREFIX);
ATTR_DESCRIPTORS.add(FILE_SUFFIX);
ATTR_DESCRIPTORS.add(SUB_FILE_TYPE);
ATTR_DESCRIPTORS.add(SUB_FILE_INTERVAL);
ATTR_DESCRIPTORS.add(WORK_DIR);
ATTR_DESCRIPTORS.add(ROWS_LIMIT_PER_FILE);
ATTR_DESCRIPTORS.add(SYNC);
ATTR_DESCRIPTORS.add(THREAD_NUM);
ATTR_DESCRIPTORS.add(BLOCK);
}
@Override
public String description() {
return DESCRIPTION;
}
@Override
public InputDescriptor inputDescriptor() {
return OperatorConstant.BASE_INPUT;
}
@Override
public OutputDescriptor outputDescriptor() {
return OperatorConstant.BASE_OUTPUT;
}
@Override
public MetaDescriptor metaDescriptor() {
return OperatorConstant.DEFAULT_META;
}
@Override
public List<AttrDescriptor> attrDescriptors() {
return ATTR_DESCRIPTORS;
}
private static final ExecutorService FILE_CLOSE_WORKER = new ThreadPoolExecutor(
2,
2,
1L,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(),
new NamedThreadFactory("file-close-worker")
);
/**
* 局部属性
*/
private List<String> selectedFields;
private String filePrefix;
private String fileSuffix;
private String msgType;
private String subFileType;
private String workDir;
private boolean shutdown;
/**
* 全局属性
*/
private static Boolean attrSync;
private static Boolean attrBlock = false;
private static Integer threadNum;
private static List<WriteWorker> writeWorks;
private static WriteWorker syncWriteWorker;
private static List<MpscArrayQueue<FileWriterItem>> queues;
private static ExecutorService executor;
private String separator;
private Integer subFileInterval;
private Integer rowsPerFile;
private String deployCityCode;
private Counter counterBCPCreate;
private Counter counterBCPClose;
private Counter counterMainFileCreate;
private Counter counterMainFileClose;
private Counter counterBCPBytes;
private Counter counterMainFileBytes;
private static final Map<String, Counter> COUNTER_ROSTER = new ConcurrentHashMap<>();
@Override
protected void init0() {
selectedFields = Optional.ofNullable(attrsDefine.getStringList(ATTR_SELECTED_FIELDS))
.orElse(Collections.emptyList());
filePrefix = attrsDefine.getStringOrDefault(ATTR_FILE_PREFIX, DEFAULT_FILE_PREFIX);
fileSuffix = attrsDefine.getStringOrDefault(ATTR_FILE_SUFFIX, DEFAULT_FILE_SUFFIX);
msgType = attrsDefine.getStringOrDefault(ATTR_MSG_TYPE, MSG_TYPE_JSON);
subFileType = attrsDefine.getStringOrDefault(ATTR_SUB_FILE_TYPE, "none");
// 校验当前目录是否存在,如果不存在则进行创建
workDir = attrsDefine.getStringOrDefault(ATTR_WORK_DIR, DEFAULT_WORK_DIR);
if (Objects.requireNonNull(attrsDefine.getString(ATTR_MSG_TYPE)).equals(MSG_TYPE_FORMATTED_STRING)) {
separator = StringEscapeUtils.unescapeJava(Optional.ofNullable(attrsDefine.getString(ATTR_SEPARATOR_NAME))
.orElseThrow(() -> new IllegalArgumentException("separator参数缺失")));
}
checkForCreateDir(workDir);
subFileInterval = attrsDefine.getIntOrDefault(ATTR_SUB_FILE_INTERVAL, DEFAULT_SUB_FILE_INTERVAL);
rowsPerFile = attrsDefine.getIntOrDefault(ATTR_RAWS_LIMIT_PER_FILE, 0);
//bcpcount
String counterKeyBCPCreate = getCounterKey(DEFAULT_NAMESPACE, filePrefix + "_bcp_create");
String counterKeyBCPClose = getCounterKey(DEFAULT_NAMESPACE, filePrefix + "_bcp_close");
String counterKeyMainFileCreate = getCounterKey(DEFAULT_NAMESPACE, filePrefix + "_mainfile_create");
String counterKeyMainFileClose = getCounterKey(DEFAULT_NAMESPACE, filePrefix + "_mainfile_close");
String counterKeyBCP = getCounterKey(DEFAULT_NAMESPACE,filePrefix + "_bcp_bytes");
String counterKeyMainFile = getCounterKey(DEFAULT_NAMESPACE,filePrefix + "_mainfile_bytes");
counterBCPCreate = COUNTER_ROSTER.computeIfAbsent(counterKeyBCPCreate, s -> {
Counter.Builder builder = Counter.build()
.namespace(DEFAULT_NAMESPACE)
.name(filePrefix + "_bcp_create")
.help(counterKeyBCPCreate);
return builder.register();
});
counterBCPClose = COUNTER_ROSTER.computeIfAbsent(counterKeyBCPClose, s -> {
Counter.Builder builder = Counter.build()
.namespace(DEFAULT_NAMESPACE)
.name(filePrefix + "_bcp_close")
.help(counterKeyBCPClose);
return builder.register();
});
counterMainFileCreate = COUNTER_ROSTER.computeIfAbsent(counterKeyMainFileCreate, s -> {
Counter.Builder builder = Counter.build()
.namespace(DEFAULT_NAMESPACE)
.name(filePrefix + "_mainfile_create")
.help(counterKeyMainFileCreate);
return builder.register();
});
counterMainFileClose = COUNTER_ROSTER.computeIfAbsent(counterKeyMainFileClose, s -> {
Counter.Builder builder = Counter.build()
.namespace(DEFAULT_NAMESPACE)
.name(filePrefix + "_mainfile_close")
.help(counterKeyMainFileClose);
return builder.register();
});
counterBCPBytes = COUNTER_ROSTER.computeIfAbsent(counterKeyBCP, s -> {
Counter.Builder builder = Counter.build()
.namespace(DEFAULT_NAMESPACE)
.name(filePrefix + "_bcp_bytes")
.help(counterKeyBCP);
return builder.register();
});
counterMainFileBytes = COUNTER_ROSTER.computeIfAbsent(counterKeyMainFile, s -> {
Counter.Builder builder = Counter.build()
.namespace(DEFAULT_NAMESPACE)
.name(filePrefix + "_mainfile_bytes")
.help(counterKeyMainFile)
.labelNames("method") ;
return builder.register();
});
initOnce();
}
/**
* 执行只初始化一次的任务
*/
private synchronized void initOnce() {
if (INIT_ONCE.compareAndSet(false, true)) {
//非同步的方式需要创建线程来进行数据写入
attrSync = Optional.ofNullable(attrsDefine.getBoolean(ATTR_SYNC)).orElse(Boolean.FALSE);
if (attrSync.equals(Boolean.FALSE)) {
threadNum = Math.max(Optional.ofNullable(attrsDefine.getInt(ATTR_THREAD_NUM)).orElse(1), 1);
attrBlock = Optional.ofNullable(attrsDefine.getBoolean(ATTR_BLOCK)).orElse(Boolean.FALSE);
// 根据threadNum 初始化线程
executor = new ThreadPoolExecutor(threadNum, threadNum, 30L, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("file-write-worker"));
queues = new ArrayList<>(threadNum);
writeWorks = new ArrayList<>(threadNum);
for (int i = 0; i < threadNum; i++) {
final MpscArrayQueue<FileWriterItem> queue = new MpscArrayQueue<>(10240);
queues.add(queue);
WriteWorker writeWorker = new WriteWorker(queue, i);
writeWorks.add(writeWorker);
executor.execute(writeWorker);
}
} else {
syncWriteWorker = new WriteWorker(null, 0);
}
}
}
private String getCounterKey(String namespace, String name) {
return new StringJoiner("_")
.add(namespace)
.add(name)
.toString();
}
private String getGaugeKey(String namespace, String name) {
return new StringJoiner("_")
.add(namespace)
.add(name)
.toString();
}
@Override
public void write(AbstractNativeMsg msg) {
if (shutdown) {
return;
}
FileWriterItem writerItem = genWriteItem(msg);
if (Boolean.TRUE.equals(attrSync)) {
writeSync(writerItem);
} else {
writeAsync(writerItem);
}
}
private void writeSync(FileWriterItem writerItem) {
// 使用当前线程写入
syncWriteWorker.tryWrite2File(writerItem);
}
private void writeAsync(FileWriterItem writerItem) {
int sleep = 0;
long total = TOTAL.getAndIncrement();
int index = (int) (total % threadNum);
final MpscArrayQueue<FileWriterItem> queue = queues.get(index);
// 临时打印 == > 修改时间自适应打印。
if ((total % (100_000) == 0)) {
log.info("===>>> FileWriter消费队列中元素个数为:{}", Joiner.on(", ").join(queues.stream().map(Queue::size).collect(Collectors.toList())));
}
try {
// block 为true 代表不能丢消息,需要阻塞等待队列有容量能够放入当前消息
while (!queue.offer(writerItem) && Boolean.TRUE.equals(attrBlock)) {
Thread.sleep(Math.min(++sleep, 8));
}
} catch (InterruptedException e) {
exceptionHandle(e);
}
}
private void exceptionHandle(InterruptedException e) {
// 遇到中断异常,需要关闭当前写线程
e.printStackTrace();
}
private FileWriterItem genWriteItem(AbstractNativeMsg msg) {
FileWriterItem writerItem = new FileWriterItem();
// 但是使用的是一份局部拷贝的新对象
writerItem.setMsg(msg.partialClone());
writerItem.setFilePrefix(filePrefix);
writerItem.setFileSuffix(fileSuffix);
writerItem.setSubFileType(subFileType);
writerItem.setWorkDir(workDir);
writerItem.setSelectedFields(selectedFields);
writerItem.setMsgType(msgType);
writerItem.setSep(this.separator);
writerItem.setSubFileInterval(this.subFileInterval);
writerItem.setRowsPerFile(this.rowsPerFile);
//monitor
writerItem.setCounterBCPCreate(counterBCPCreate);
writerItem.setCounterBCPClose(counterBCPClose);
writerItem.setCounterMainFileCreate(counterMainFileCreate);
writerItem.setCounterMainFileClose(counterMainFileClose);
writerItem.setCounterBCPBytes(counterBCPBytes);
writerItem.setCounterMainFileBytes(counterMainFileBytes);
return writerItem;
}
/**
* 检查或创建文件夹
*/
private synchronized void checkForCreateDir(String dirName) {
File file = Paths.get(dirName).toFile();
if (!file.exists()) {
file.mkdirs();
}
}
@Override
public void shutdown() {
shutdown = true;
if (writeWorks != null && !writeWorks.isEmpty()) {
writeWorks.forEach(WriteWorker::shutdown);
}
if (executor != null && !executor.isShutdown()) {
executor.shutdown();
}
}
//******************************************************************************************************************//
//******************************************** WRITE THREAD START ********************************************//
//******************************************************************************************************************//
/**
* debug 日志写线程
*/
private static class WriteWorker implements Runnable, Closeable {
private static final int WORKER_STATUS_NEW = -1;
private static final int WORKER_STATUS_RUNNING = 0;
private static final int WORKER_STATUS_EXIT = 1;
private static final int WAITING_MAX_TIMES = 1000;
public static final String RAW_FILE_FILED = "_RAW_FILE_FIELD";
public static final String RAW_FILE_ENABLE = "_RAW_FILE_ENABLE";
public static final String RAW1_ENABLE = "_RAW1_ENABLE";
public static final String RAW2_ENABLE = "_RAW2_ENABLE";
public static final String RAW1_BASE64_ENABLE = "_RAW1_BASE64_ENABLE";
public static final String RAW2_BASE64_ENABLE = "_RAW2_BASE64_ENABLE";
private static final String TWO_LINE_SEP = "\n\n";
private static final String FILE_NAME_SEPARATOR = "-";
private static final ThreadLocal<StringBuilder> STRING_BUILDER_THREAD_LOCAL = ThreadLocal.withInitial(StringBuilder::new);
private AtomicBoolean shutdown = new AtomicBoolean(false);
private final MpscArrayQueue<FileWriterItem> queue;
private final int index;
private int status;
private final AtomicLong fileCount = new AtomicLong(0);
private final MultiKeyMap<String, WriterWrapper> writerMap = new MultiKeyMap<>();
private final AtomicLong mainFileCount = new AtomicLong(0);
private final String deployCityCode;
private final Boolean prometheusEnable = Optional.ofNullable(Env.INSTANCE.getBoolean(PROMETHEUS_MONITOR_ENABLE)).orElse(false);
public WriteWorker(MpscArrayQueue<FileWriterItem> queue, int index) {
this.queue = queue;
this.index = index;
this.status = WORKER_STATUS_NEW;
// 获取不成部署的地市区域编码
this.deployCityCode = Optional.ofNullable(Env.INSTANCE.getString(DEPLOY_CITY_CODE))
.orElse("null");
}
@Override
public void run() {
int sleep = 1;
status = WORKER_STATUS_RUNNING;
while (!shutdown.get() && !Thread.interrupted()) {
try {
boolean success = queue.drain(this::tryWrite2File) > 0;
if (!success && queue.isEmpty()) {
Thread.sleep(Math.min(sleep++, 32));
} else {
sleep = 1;
}
} catch (Throwable e) {
log.error("write thread error: ", e);
// todo 异常处理,挂掉当前线程。
}
}
close();
status = WORKER_STATUS_EXIT;
log.info("FileWrite Thread exit, stop:{}, interrupted:{}", shutdown, Thread.currentThread().isInterrupted());
}
@Override
public void close() {
try {
// 如果 queue 中还有数据,尝试将剩余的数据写入到文件中
log.info("{}开始执行关闭逻辑", Thread.currentThread().getName());
if (!queue.isEmpty()) {
log.info("file-write-worker-[{}]缓存队列非空,剩余日志数:[{}], 尝试将队列中数据写入文件", index, queue.size());
queue.drain(this::tryWrite2File);
}
Iterator<Map.Entry<MultiKey<? extends String>, WriterWrapper>> iterator = writerMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<MultiKey<? extends String>, WriterWrapper> next = iterator.next();
WriterWrapper wrapper = next.getValue();
try {
// 关闭时使用同步方式进行关闭
wrapper.close();
if(prometheusEnable){
wrapper.getCounterBCPClose().inc();
wrapper.getCounterBCPBytes().inc(wrapper.getFile().length());
}
// 关闭后将元素移除
iterator.remove();
} catch (Throwable e) {
log.error("进程关闭,关闭文件[{}]出现异常:", wrapper.getFile().getAbsolutePath(), e);
}
}
} catch (Exception e) {
log.info("{}关闭异常:", Thread.currentThread().getName(), e);
}
}
public void shutdown() {
if (shutdown.compareAndSet(false, true)) {
int times = 0;
try {
// 等待worker 线程正常退出或者达到最大等待时长
while (status != WORKER_STATUS_EXIT || times >= WAITING_MAX_TIMES) {
TimeUnit.MILLISECONDS.sleep(10);
times++;
}
} catch (Exception e) {
log.info("file-write-worker-{}关闭异常:", index, e);
}
}
}
/**
* 执行写文件逻辑
*/
private void tryWrite2File(FileWriterItem writerItem) {
// TODO 待优化 当前依靠新的日志来触发文件切换的条件
WriterWrapper wrapper = getWriterOrCreate(writerItem);
if (wrapper != null) {
// 如果被重置了,需要再次获取wrapper
if (resetWriterIfNeed(wrapper)) {
wrapper = getWriterOrCreate(writerItem);
if (wrapper == null) {
return;
}
}
try {
// 处理原始码流文件
dealWithRawFile(writerItem, wrapper);
// 写入bcp文件
String writeMsg = genWriteMsg(writerItem);
if (writeMsg != null) {
wrapper.writeRow(writeMsg);
// 暂时不需要重置 MAINFILE 字段,当前msg对象是一份拷贝,新增字段不会影响原对象
}
} catch (IOException e) {
log.error("写入内容异常, fileName:[{}], error", wrapper.getFile().getAbsolutePath(), e);
}
}
}
/**
* 生成mainfile
*
* @param writerItem
* @param wrapper
*/
private void dealWithRawFile(FileWriterItem writerItem, WriterWrapper wrapper) {
AbstractNativeMsg msg = writerItem.getMsg();
Boolean rawFileEnable = (Boolean) msg.get(RAW_FILE_ENABLE);
if (rawFileEnable == null || !rawFileEnable) {
return;
}
Boolean raw1Enable = (Boolean) msg.get(RAW1_ENABLE);
Boolean raw2Enable = (Boolean) msg.get(RAW2_ENABLE);
Boolean raw1Base64Enable = (Boolean) msg.get(RAW1_BASE64_ENABLE);
Boolean raw2Base64Enable = (Boolean) msg.get(RAW2_BASE64_ENABLE);
// 至少需要开启一个选项才进行写入
if ((raw1Enable == null || !raw1Enable) && (raw2Enable == null || !raw2Enable) && (raw1Base64Enable == null || !raw1Base64Enable) && (raw2Base64Enable == null || !raw2Base64Enable)) {
return;
}
//打点记一次
if (prometheusEnable){
wrapper.getCounterMainFileCreate().inc();
}
byte[] raw1 = (byte[]) writerItem.getMsg().get(FixedFieldDict.RAW_1);
byte[] raw2 = (byte[]) writerItem.getMsg().get(FixedFieldDict.RAW_2);
String rawFileField = (String) writerItem.getMsg().get(RAW_FILE_FILED);
// 原始码流都不存在的情况下也不需要写mainfile
// 如果已经填了mainfile,则需要处理(复制文件到正确的地方)
if (!rawDataEnable(raw1) && !rawDataEnable(raw2) && (StringUtils.isEmpty(rawFileField) || StringUtils.isEmpty(writerItem.getMsg().getString(rawFileField)))) {
return;
}
try2HandleMainFile(writerItem, wrapper, raw1Enable, raw2Enable, raw1Base64Enable, raw2Base64Enable, raw1, raw2);
}
private void try2HandleMainFile(FileWriterItem writerItem,
WriterWrapper wrapper,
Boolean raw1Enable,
Boolean raw2Enable,
Boolean raw1Base64Enable,
Boolean raw2Base64Enable,
byte[] raw1,
byte[] raw2) {
// 当前 mainfile 跟 bcp 文件在同一目录下,去除.bcp后缀的文件名作为rawFile的目录
String filePath = wrapper.getFile().getAbsolutePath();
int indexOf = filePath.lastIndexOf(wrapper.getFileSuffix());
String mainFilePath = filePath.substring(0, indexOf);
File mainFileDir = Paths.get(mainFilePath).toFile();
if (!mainFileDir.exists()) {
mainFileDir.mkdirs();
}
String mainFileName = genMainFileName(wrapper);
File mainFile = Paths.get(mainFileDir.getAbsolutePath(), mainFileName).toFile();
String rawFileField = (String) writerItem.getMsg().get(RAW_FILE_FILED);
if (rawFileField != null) {
try {
String existMainFilePath = (String) writerItem.getMsg().get(rawFileField);
if (StringUtils.isNotEmpty(existMainFilePath)) {
File f = Paths.get(existMainFilePath).toFile();
if (f.exists()) {
String existMainFileName = f.getName();
String existMainFileSuffix = existMainFileName.indexOf(".") > 0 ? existMainFileName.substring(existMainFileName.indexOf(".")) : "";
String targetMainFileName = mainFileName.indexOf(".") > 0 ? mainFileName.substring(0, mainFileName.indexOf(".")) : mainFileName;
mainFile = Paths.get(mainFileDir.getAbsolutePath(), targetMainFileName + existMainFileSuffix).toFile();
FileUtil.copy(f, mainFile, true);
//mainfile复制了一次,打点
if (prometheusEnable){
wrapper.getCounterMainFileClose().inc();
wrapper.getCounterMainFileBytes().labels("COPY").inc(mainFile.length());
}
if (mainFile.exists()) {
writerItem.getMsg().put(rawFileField, mainFile.getAbsolutePath());
}
} else {
log.error("rawFileField:{} has content:{} but no file exist", rawFileField, existMainFilePath);
writerItem.getMsg().put(rawFileField, null);
}
return;
} else {
try (java.io.FileWriter mainFileWriter = new FileWriter(mainFile)) {
//加了一个counter
writeMainFileContent(mainFileWriter, raw1Enable, raw2Enable, raw1Base64Enable, raw2Base64Enable, raw1, raw2);
mainFileWriter.flush();
}
//mainfile写了一个,打点
if (prometheusEnable){
wrapper.getCounterMainFileClose().inc();
wrapper.getCounterMainFileBytes().labels("WRITE").inc(mainFile.length());
}
}
// 进行 原始码流文件字段 赋值
writerItem.getMsg().put(rawFileField, mainFile.getAbsolutePath());
} catch (IOException e) {
log.error("写MAINFILE异常,filePrefix:[{}], 异常:", writerItem.getFilePrefix(), e);
}
}
}
private void writeMainFileContent(FileWriter fileWriter,
Boolean raw1Enable,
Boolean raw2Enable,
Boolean raw1Base64Enable,
Boolean raw2Base64Enable,
byte[] raw1,
byte[] raw2) throws IOException {
if (rawDataEnable(raw1)) {
if (raw1Enable != null && raw1Enable) {
fileWriter.write(new String(raw1));
fileWriter.write(TWO_LINE_SEP);
}
if (raw1Base64Enable != null && raw1Base64Enable) {
String raw1Base64 = Base64.getEncoder().encodeToString(raw1);
fileWriter.write(raw1Base64);
fileWriter.write(TWO_LINE_SEP);
}
}
if (rawDataEnable(raw2)) {
if (raw2Enable != null && raw2Enable) {
fileWriter.write(new String(raw2));
fileWriter.write(TWO_LINE_SEP);
}
if (raw2Base64Enable != null && raw2Base64Enable) {
String raw2Base64 = Base64.getEncoder().encodeToString(raw2);
fileWriter.write(raw2Base64);
fileWriter.write(TWO_LINE_SEP);
}
}
}
private boolean rawDataEnable(byte[] raw) {
return raw != null && raw.length > 0;
}
private String genMainFileName(WriterWrapper wrapper) {
return new StringBuilder()
.append(wrapper.getFilePrefix()).append("_raw_")
.append(index).append("_")
.append(mainFileCount.incrementAndGet()).append("_")
.append(System.currentTimeMillis())
.append(".txt")
.toString();
}
private WriterWrapper getWriterOrCreate(FileWriterItem writerItem) {
// 根据前缀和后缀获取,如果没有则新建
WriterWrapper wrapper = writerMap.get(writerItem.getFilePrefix(), writerItem.getFileSuffix());
if (wrapper == null) {
try {
//wrapper = WriterWrapper.from(writerItem, index, fileCount.getAndIncrement(), true);
String fileNameForTJ = genFileNameForTJ(writerItem);
wrapper = WriterWrapper.from(writerItem, fileNameForTJ, true);
//新建了bcp文件,打点
if (prometheusEnable){
writerItem.getCounterBCPCreate().inc();
writerMap.put(writerItem.getFilePrefix(), writerItem.getFileSuffix(), wrapper);
}
} catch (IOException e) {
log.error("获取writerWrapper异常,data:[{}], error:", writerItem, e);
}
}
return wrapper;
}
private boolean resetWriterIfNeed(WriterWrapper wrapper) {
if (wrapper.isExpire()) {
// rest the cached map
writerMap.put(wrapper.getFilePrefix(), wrapper.getFileSuffix(), null);
tryFlush(wrapper);
return true;
}
return false;
}
private void tryFlush(WriterWrapper wrapper) {
if (wrapper != null && wrapper.getWriter() != null) {
FILE_CLOSE_WORKER.execute(() -> {
try {
wrapper.close();
//同步消息
if (prometheusEnable){
wrapper.getCounterBCPClose().inc();
wrapper.getCounterBCPBytes().inc(wrapper.getFile().length());
}
} catch (IOException e) {
log.error("try flush file close writer error: ", e);
}
});
}
}
private String genWriteMsg(FileWriterItem writerItem) {
String msgType = writerItem.getMsgType();
switch (msgType) {
case MSG_TYPE_JSON: {
return jsonTypeMsg(writerItem);
}
case MSG_TYPE_FORMATTED_STRING: {
return formattedTypeMsg(writerItem);
}
default:
throw new UnsupportedOperationException();
}
}
private String formattedTypeMsg(FileWriterItem writerItem) {
List<String> selectedFields = writerItem.getSelectedFields();
if (selectedFields.isEmpty()) {
return null;
}
final AbstractNativeMsg msg = writerItem.getMsg();
StringBuilder stringBuilder = STRING_BUILDER_THREAD_LOCAL.get();
stringBuilder.setLength(0);
for (int i = 0; i < selectedFields.size() - 1; i++) {
final String name = selectedFields.get(i);
final Object value = msg.get(name);
String s = replaceSensitiveChars(value, writerItem);
stringBuilder.append(s).append(writerItem.getSep());
}
final Object finalValue = msg.get(selectedFields.get(selectedFields.size() - 1));
String s = replaceSensitiveChars(finalValue, writerItem);
stringBuilder.append(s);
return stringBuilder.toString();
}
private String replaceSensitiveChars(Object object, FileWriterItem writerItem) {
if (object == null) {
return "";
}
String part = Objects.toString(object);
// if (part.contains(writerItem.getSep())) {
// part = part.replaceAll(writerItem.getSep(), URLUtil.encode(writerItem.getSep()));
// }
if (part.contains("\t")) {
part = part.replaceAll("\t", "\\\\t");
}
if (part.contains("\r")) {
part = part.replaceAll("\r", "\\\\r");
}
if (part.contains("\n")) {
part = part.replaceAll("\n", "\\\\n");
}
return part;
}
private String jsonTypeMsg(FileWriterItem writerItem) {
List<String> selectedFields = writerItem.getSelectedFields();
AbstractNativeMsg msg = writerItem.getMsg();
JSONObject logItem = new JSONObject(selectedFields.size());
for (String selectedField : selectedFields) {
logItem.put(selectedField, msg.get(selectedField));
}
return logItem.toJSONString();
}
/**
* 根据TJ的规范生成fileName
* 生成规则:
* <pre>
* 第一个固定111,第二个是地市区域编码,第三个是生成时间戳,第四个是随机数,第五个是大写类型名,第六个0.xx1_bq保留不动
* 示例:111-511100-1620575830-06580-DATA_IM-0.xx1_bq.bcp
* </pre>
*
* @return
*/
private String genFileNameForTJ(FileWriterItem writerItem) {
return new StringBuilder("111").append(FILE_NAME_SEPARATOR)
.append(deployCityCode).append(FILE_NAME_SEPARATOR)
.append(System.currentTimeMillis() / 1000).append(FILE_NAME_SEPARATOR)
.append(index).append("0").append(fileCount.incrementAndGet()).append(FILE_NAME_SEPARATOR)
.append(writerItem.getFilePrefix().toUpperCase()).append(FILE_NAME_SEPARATOR)
.append("xx1_bq")
.append(writerItem.getFileSuffix())
.toString();
}
}
//******************************************************************************************************************//
//******************************************** WRITE THREAD END ********************************************//
//******************************************************************************************************************//
}
这里面生成了一个静态内部类,用非阻塞、异步的方式处理IO。