Skip to content

Commit 0ed34e1

Browse files
committed
add Http-Proxy for TIS
1 parent df93f8b commit 0ed34e1

File tree

30 files changed

+236
-76
lines changed

30 files changed

+236
-76
lines changed

tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/AliyunEndpoint.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.qlangtech.tis.plugin.AuthToken.IAliyunAccessKey;
88
import com.qlangtech.tis.plugin.HttpEndpoint;
99
import com.qlangtech.tis.plugin.HttpEndpoint.IAliyunEndpoint;
10+
import com.qlangtech.tis.plugin.IEndTypeGetter;
1011
import org.apache.commons.collections.CollectionUtils;
1112
import org.apache.commons.lang3.StringUtils;
1213

@@ -43,11 +44,16 @@ public String getEndpointHost() {
4344
}
4445

4546
@TISExtension()
46-
public static class DefaultDescriptor extends HttpEndpoint.DefaultDescriptor {
47+
public static class DefaultDescriptor extends HttpEndpoint.DefaultDescriptor implements IEndTypeGetter {
4748
public DefaultDescriptor() {
4849
super(KEY_FIELD_ALIYUN_TOKEN);
4950
}
5051

52+
@Override
53+
public EndType getEndType() {
54+
return EndType.Aliyun;
55+
}
56+
5157
@Override
5258
public String getDisplayName() {
5359
return KEY_FIELD_ALIYUN_TOKEN;

tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataFlowDataXProcessor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,6 @@ public IDataxReader getReader(IPluginContext pluginCtx) {
116116
throw new UnsupportedOperationException("dataflow processor not support single reader getter");
117117
}
118118

119-
@Override
120-
public TableAliasMapper getTabAlias(IPluginContext pluginCtx) {
121-
return TableAliasMapper.Null;
122-
}
123119

124120
@Override
125121
public void afterSaved(IPluginContext pluginContext, Optional<Context> context) {
@@ -419,6 +415,11 @@ public DataXCfgGenerator.GenerateCfgs getDataxCfgFileNames(IPluginContext plugin
419415
return DataxProcessor.getDataxCfgFileNames(pluginCtx, partialTrigger, this);
420416
}
421417

418+
@Override
419+
public TableAliasMapper getTabAlias(IPluginContext pluginCtx, boolean withDft) {
420+
return TableAliasMapper.Null;
421+
}
422+
422423

423424
@TISExtension()
424425
public static class DescriptorImpl extends Descriptor<IAppSource> implements IEndTypeGetter {

tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public void run() {
171171

172172
final TableAliasMapper tableAliasMapper
173173
= execContext.getAttribute(TableAlias.class.getSimpleName(), () -> {
174-
return execContext.getProcessor().getTabAlias(null);
174+
return execContext.getProcessor().getTabAlias(null, true);
175175
});
176176

177177
BasicDataSourceFactory dsFactory = ((BasicDataSourceFactory) getDataSourceFactory());

tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/resmatcher/WildcardDFSResMatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public static Optional<TableMap> getTableMap(IPluginContext pluginContext, Strin
6565
throw new IllegalArgumentException("param dataXName can not be empty");
6666
}
6767
IDataxProcessor dataxProcessor = DataxProcessor.load(pluginContext, dataXName);
68-
TableAliasMapper tabAlias = dataxProcessor.getTabAlias(pluginContext);
68+
TableAliasMapper tabAlias = dataxProcessor.getTabAlias(pluginContext, true);
6969
Optional<TableMap> tabAlia = tabAlias.getFirstTableMap();
7070
return tabAlia;
7171
}
@@ -94,7 +94,7 @@ public List<ColumnMetaData> getTableMetadata(IPluginContext pluginContext, Strin
9494
*/
9595
@Override
9696
public SourceColsMeta getSourceColsMeta(ITDFSSession hdfsSession, Optional<String> entityName, String path, IDataxProcessor processor) {
97-
TableAliasMapper tabAlias = processor.getTabAlias(null);
97+
TableAliasMapper tabAlias = processor.getTabAlias(null, false);
9898
Optional<TableAlias> findMapper = tabAlias.findFirst();
9999
IDataxProcessor.TableMap tabMapper
100100
= (IDataxProcessor.TableMap) findMapper.orElseThrow(() -> new NullPointerException("TableAlias can not be null"));
@@ -110,7 +110,7 @@ public List<ISelectedTab> getSelectedTabs(IDFSReader dfsReader) {
110110
}
111111

112112
IDataxProcessor processor = DataxProcessor.load(IPluginContext.getThreadLocalInstance(), reader.dataXName);
113-
TableAliasMapper tabAlias = processor.getTabAlias(IPluginContext.getThreadLocalInstance());
113+
TableAliasMapper tabAlias = processor.getTabAlias(IPluginContext.getThreadLocalInstance(), false);
114114
Optional<TableAlias> findMapper = tabAlias.findFirst();
115115
if (findMapper.isPresent()) {
116116
IDataxProcessor.TableMap tabMapper = (IDataxProcessor.TableMap) findMapper.get();
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# DFS数据读取插件使用说明
2+
3+
## 功能概述
4+
5+
本插件用于从分布式文件系统(如HDFS、FTP服务器等)中批量读取数据文件。支持通过匹配规则自动查找文件,无需手工指定每个文件路径。
6+
7+
## 主要配置项
8+
9+
### 1. 文件系统连接(dfsLinker)
10+
配置目标文件系统的连接信息,包括服务器地址、端口、访问凭证等。系统支持HDFS、FTP等多种分布式存储。
11+
12+
### 2. 文件匹配规则(resMatcher)
13+
设置如何查找目标文件,支持两种方式:
14+
- **通配符匹配**:使用 `*.csv``data_*.txt` 等模式匹配文件名
15+
- **多层目录遍历**:可设置遍历深度,自动搜索子目录下的文件
16+
17+
### 3. 文件格式(fileFormat)
18+
选择数据文件的格式类型(如CSV、文本文件等),并配置分隔符、编码、压缩格式等参数。
19+
20+
## 配置验证
21+
22+
保存配置时,系统会立即连接到文件系统进行验证:
23+
- 显示找到的文件数量
24+
- 列出前5个匹配文件的完整路径
25+
- 如果找不到文件,会提示具体的错误原因
26+
27+
## 适用场景
28+
29+
- 从Hadoop集群批量导入日志文件
30+
- 定期从FTP服务器读取业务数据文件
31+
- 处理按日期分目录存储的历史数据
32+
- 一次性读取多个结构相同的数据文件
33+
34+
## 使用提示
35+
36+
1. 建议先用小范围路径测试匹配规则是否正确
37+
2. 确认文件格式配置与实际文件一致
38+
3. 注意文件系统的访问权限设置
39+
4. 多层目录遍历时,控制好遍历深度以避免扫描过多文件

tis-datax/tis-datax-dolphinscheduler-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doplinscheduler/export/DolphinSchedulerURLBuilder.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,9 @@
2828
import com.qlangtech.tis.manage.common.HttpUtils.PostParam;
2929
import com.qlangtech.tis.manage.common.PostFormStreamProcess;
3030
import com.qlangtech.tis.manage.common.TisUTF8;
31-
import com.qlangtech.tis.plugin.datax.IWorkflowNode;
3231
import org.apache.commons.collections.CollectionUtils;
3332
import org.apache.commons.io.IOUtils;
3433
import org.apache.commons.lang.StringUtils;
35-
import org.jetbrains.annotations.NotNull;
3634
import org.slf4j.Logger;
3735
import org.slf4j.LoggerFactory;
3836

@@ -156,11 +154,13 @@ public DolphinSchedulerResponse applyPost(List<PostParam> params, Optional<Strea
156154

157155
private DolphinSchedulerResponse applyRequest(List<PostParam> params
158156
, Optional<StreamErrorProcess> streamErrorProcess, URL applyUrl, HTTPMethod httpMethod) {
159-
return HttpUtils.process(applyUrl, params, new PostFormStreamProcess<DolphinSchedulerResponse>() {
160-
@Override
161-
public List<Header> getHeaders() {
162-
return endpoint.appendToken(super.getHeaders());
163-
}
157+
158+
return HttpUtils.process(applyUrl, params, new PostFormStreamProcess<DolphinSchedulerResponse>( //
159+
endpoint.appendToken(Lists.newArrayList())) {
160+
// @Override
161+
// public List<Header> getHeaders() {
162+
// return endpoint.appendToken(super.getHeaders());
163+
// }
164164

165165
@Override
166166
public void error(int status, InputStream errstream, IOException e) throws Exception {

tis-datax/tis-datax-dolphinscheduler-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doplinscheduler/export/ExportTISPipelineToDolphinscheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ protected void afterManipuldateProcess(IPluginContext pluginContext, Optional<Co
210210
addProjectParameters();
211211

212212

213-
IDataxProcessor dataxProcessor = DataxProcessor.load(pluginContext, itemsProcessor.getOriginIdentityId());
213+
IDataxProcessor dataxProcessor = DataxProcessor.load(pluginContext, itemsProcessor.getOriginIdentityId().orElseThrow());
214214
DSWorkflowPayload workflowPayload = new DSWorkflowPayload(this, dataxProcessor
215215
, BasicDistributedSPIDataXJobSubmit.getCommonDAOContext(pluginContext), new DolphinschedulerDistributedSPIDataXJobSubmit());
216216
WorkflowSPIInitializer<DSWorkflowInstance> workflowSPIInitializer = new WorkflowSPIInitializer<>(workflowPayload);

tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ public Void varcharType(DataType type) {
244244
@Override
245245
public List<ESColumn> initialIndex(IDataxProcessor dataxProcessor) {
246246
ESTableAlias esSchema = null;
247-
Optional<TableAlias> first = dataxProcessor.getTabAlias(null).findFirst();
247+
Optional<TableAlias> first = dataxProcessor.getTabAlias(null, true).findFirst();
248248
if (first.isPresent()) {
249249
TableAlias value = first.get();
250250
if (!(value instanceof ESTableAlias)) {

tis-datax/tis-datax-kafka-plugin/src/main/java/com/qlangtech/tis/plugins/datax/kafka/writer/DataXKafkaWriter.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@
2525
import com.qlangtech.tis.datax.IDataxProcessor;
2626
import com.qlangtech.tis.datax.impl.DataxWriter;
2727
import com.qlangtech.tis.extension.Descriptor;
28-
import com.qlangtech.tis.extension.ElementPluginDesc;
2928
import com.qlangtech.tis.extension.TISExtension;
30-
import com.qlangtech.tis.extension.impl.PropertyType;
31-
import com.qlangtech.tis.extension.impl.SuFormProperties;
3229
import com.qlangtech.tis.lang.TisException;
3330
import com.qlangtech.tis.manage.common.TisUTF8;
3431
import com.qlangtech.tis.plugin.ValidatorCommons;
@@ -39,7 +36,9 @@
3936
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
4037
import com.qlangtech.tis.plugins.datax.kafka.writer.protocol.KafkaProtocol;
4138
import com.qlangtech.tis.realtime.transfer.DTO;
39+
import com.qlangtech.tis.realtime.utils.NetUtils;
4240
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
41+
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
4342
import com.qlangtech.tis.trigger.util.JsonUtil;
4443
import org.apache.commons.lang.StringUtils;
4544
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -51,6 +50,7 @@
5150
import org.slf4j.Logger;
5251
import org.slf4j.LoggerFactory;
5352

53+
import java.net.UnknownHostException;
5454
import java.util.Map;
5555
import java.util.Objects;
5656
import java.util.Optional;
@@ -150,7 +150,7 @@ public String getTemplate() {
150150

151151
@Override
152152
public IDataxContext getSubTask(
153-
Optional<IDataxProcessor.TableMap> tableMap,Optional<RecordTransformerRules> transformerRules) {
153+
Optional<IDataxProcessor.TableMap> tableMap, Optional<RecordTransformerRules> transformerRules) {
154154
return null;
155155
}
156156

@@ -196,12 +196,44 @@ public Map<String, Object> buildKafkaConfig(boolean isTest) {
196196
@TISExtension
197197
public static class DefaultDescriptor extends BaseDataxWriterDescriptor implements DataxWriter.IRewriteSuFormProperties {
198198

199-
// private transient SuFormProperties rewriteSubFormProperties;
199+
// private transient SuFormProperties rewriteSubFormProperties;
200200

201201
public DefaultDescriptor() {
202202
super();
203203
}
204204

205+
// @Override
206+
public boolean validateBootstrapServers(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) {
207+
208+
String[] servs = org.apache.commons.lang3.StringUtils.split(value, ",");
209+
if (servs.length < 1) {
210+
msgHandler.addFieldError(context, fieldName, ValidatorCommons.MSG_EMPTY_INPUT_ERROR);
211+
return false;
212+
}
213+
214+
for (String host : servs) {
215+
if (!Validator.host.validate(msgHandler, context, fieldName, host)) {
216+
return false;
217+
}
218+
String[] split = org.apache.commons.lang3.StringUtils.split(host, ":");
219+
try {
220+
if (!NetUtils.isPortAvailable(split[0], Integer.parseInt(split[1]))) {
221+
msgHandler.addFieldError(context, fieldName, "地址:" + host + " 不可触达,请检查是否可用");
222+
return false;
223+
}
224+
} catch (Exception e) {
225+
UnknownHostException unknownHostException = null;
226+
if ((unknownHostException = ExceptionUtils.throwableOfType(e, UnknownHostException.class)) != null) {
227+
msgHandler.addFieldError(context, fieldName, "主机地址:‘" + unknownHostException.getMessage() + "’无法识别");
228+
return false;
229+
}
230+
throw new RuntimeException(e);
231+
}
232+
}
233+
234+
235+
return true;
236+
}
205237
// @Override
206238
// public SuFormProperties overwriteSubPluginFormPropertyTypes(SuFormProperties subformProps) throws Exception {
207239
//

tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/BasicTemplate.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77
import com.qlangtech.tis.datax.IDataxReader;
88
import com.qlangtech.tis.datax.IDataxReaderContext;
99
import com.qlangtech.tis.datax.IDataxWriter;
10+
import com.qlangtech.tis.datax.StoreResourceType;
1011
import com.qlangtech.tis.datax.TableAliasMapper;
1112
import com.qlangtech.tis.datax.impl.DataXCfgGenerator;
12-
import com.qlangtech.tis.datax.StoreResourceType;
1313
import com.qlangtech.tis.datax.impl.TransformerInfo;
1414
import com.qlangtech.tis.plugin.IPluginStore;
15-
import com.qlangtech.tis.plugin.datax.SelectedTab;
1615
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
1716
import com.qlangtech.tis.plugin.ds.ISelectedTab;
1817
import com.qlangtech.tis.plugin.trigger.JobTrigger;
@@ -113,7 +112,7 @@ public DataXCfgGenerator.GenerateCfgs getDataxCfgFileNames(IPluginContext plugin
113112
}
114113

115114
@Override
116-
public TableAliasMapper getTabAlias(IPluginContext pluginCtx) {
115+
public TableAliasMapper getTabAlias(IPluginContext pluginCtx, boolean withDft) {
117116
return tableAliasMapper;
118117
}
119118

0 commit comments

Comments
 (0)