• 1. 引入依赖模块
  • 2. 建立测试类
  • 3. Scala测试代码:

    Linkis 提供了方便的JAVA和SCALA调用的接口,只需要引入ujes-client的模块就可以进行使用

    1. 引入依赖模块

    1. <dependency>
    2. <groupId>com.webank.wedatasphere.Linkis</groupId>
    3. <artifactId>Linkis-ujes-client</artifactId>
    4. <version>0.6.0</version>
    5. </dependency>

    2. 建立测试类

    建立Java的测试类UJESClientImplTestJ,具体接口含义可以见注释:

    1. package com.webank.bdp.dataworkcloud.ujes.client;
    2. import com.webank.wedatasphere.Linkis.common.utils.Utils;
    3. import com.webank.wedatasphere.Linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
    4. import com.webank.wedatasphere.Linkis.httpclient.dws.config.DWSClientConfig;
    5. import com.webank.wedatasphere.Linkis.httpclient.dws.config.DWSClientConfigBuilder;
    6. import com.webank.wedatasphere.Linkis.ujes.client.UJESClient;
    7. import com.webank.wedatasphere.Linkis.ujes.client.UJESClientImpl;
    8. import com.webank.wedatasphere.Linkis.ujes.client.request.JobExecuteAction;
    9. import com.webank.wedatasphere.Linkis.ujes.client.request.ResultSetAction;
    10. import com.webank.wedatasphere.Linkis.ujes.client.response.JobExecuteResult;
    11. import com.webank.wedatasphere.Linkis.ujes.client.response.JobInfoResult;
    12. import com.webank.wedatasphere.Linkis.ujes.client.response.JobProgressResult;
    13. import com.webank.wedatasphere.Linkis.ujes.client.response.JobStatusResult;
    14. import org.apache.commons.io.IOUtils;
    15. import java.util.concurrent.TimeUnit;
    16. public class UJESClientImplTestJ{
    17. public static void main(String[] args){
    18. // 1. 配置DWSClientBuilder,通过DWSClientBuilder获取一个DWSClientConfig
    19. DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
    20. .addUJESServerUrl("http://${ip}:${port}") //指定ServerUrl,Linkis服务器端网关的地址,如http://{ip}:{port}
    21. .connectionTimeout(30000) //connectionTimeOut 客户端连接超时时间
    22. .discoveryEnabled(true).discoveryFrequency(1, TimeUnit.MINUTES) //是否启用注册发现,如果启用,会自动发现新启动的Gateway
    23. .loadbalancerEnabled(true) // 是否启用负载均衡,如果不启用注册发现,则负载均衡没有意义
    24. .maxConnectionSize(5) //指定最大连接数,即最大并发数
    25. .retryEnabled(false).readTimeout(30000) //执行失败,是否允许重试
    26. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis认证方式
    27. .setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //认证key,一般为用户名; 认证value,一般为用户名对应的密码
    28. .setDWSVersion("v1").build(); //Linkis后台协议的版本,当前版本为v1
    29. // 2. 通过DWSClientConfig获取一个UJESClient
    30. UJESClient client = new UJESClientImpl(clientConfig);
    31. // 3. 开始执行代码
    32. JobExecuteResult jobExecuteResult = client.execute(JobExecuteAction.builder()
    33. .setCreator("LinkisClient-Test") //creator,请求Linkis的客户端的系统名,用于做系统级隔离
    34. .addExecuteCode("show tables") //ExecutionCode 请求执行的代码
    35. .setEngineType(JobExecuteAction.EngineType$.MODULE$.HIVE()) // 希望请求的Linkis的执行引擎类型,如Spark hive等
    36. .setUser("johnnwang") //User,请求用户;用于做用户级多租户隔离
    37. .build());
    38. System.out.println("execId: " + jobExecuteResult.getExecID() + ", taskId: " + jobExecuteResult.taskID());
    39. // 4. 获取脚本的执行状态
    40. JobStatusResult status = client.status(jobExecuteResult);
    41. while(!status.isCompleted()) {
    42. // 5. 获取脚本的执行进度
    43. JobProgressResult progress = client.progress(jobExecuteResult);
    44. Utils.sleepQuietly(500);
    45. status = client.status(jobExecuteResult);
    46. }
    47. // 6. 获取脚本的Job信息
    48. JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
    49. // 7. 获取结果集列表(如果用户一次提交多个SQL,会产生多个结果集)
    50. String resultSet = jobInfo.getResultSetList(client)[0];
    51. // 8. 通过一个结果集信息,获取具体的结果集
    52. Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();
    53. System.out.println("fileContents: " + fileContents);
    54. IOUtils.closeQuietly(client);
    55. }
    56. }

    运行上述的代码即可以和Linkis进行交互

    3. Scala测试代码:

    1. import java.util.concurrent.TimeUnit
    2. import com.webank.wedatasphere.Linkis.common.utils.Utils
    3. import com.webank.wedatasphere.Linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
    4. import com.webank.wedatasphere.Linkis.httpclient.dws.config.DWSClientConfigBuilder
    5. import com.webank.wedatasphere.Linkis.ujes.client.request.JobExecuteAction.EngineType
    6. import com.webank.wedatasphere.Linkis.ujes.client.request.{JobExecuteAction, ResultSetAction}
    7. import org.apache.commons.io.IOUtils
    8. object UJESClientImplTest extends App {
    9. // 1. 配置DWSClientBuilder,通过DWSClientBuilder获取一个DWSClientConfig
    10. val clientConfig = DWSClientConfigBuilder.newBuilder()
    11. .addUJESServerUrl("http://${ip}:${port}") //指定ServerUrl,Linkis服务器端网关的地址,如http://{ip}:{port}
    12. .connectionTimeout(30000) //connectionTimeOut 客户端连接超时时间
    13. .discoveryEnabled(true).discoveryFrequency(1, TimeUnit.MINUTES) //是否启用注册发现,如果启用,会自动发现新启动的Gateway
    14. .loadbalancerEnabled(true) // 是否启用负载均衡,如果不启用注册发现,则负载均衡没有意义
    15. .maxConnectionSize(5) //指定最大连接数,即最大并发数
    16. .retryEnabled(false).readTimeout(30000) //执行失败,是否允许重试
    17. .setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis认证方式
    18. .setAuthTokenKey("${username}").setAuthTokenValue("${password}") //认证key,一般为用户名; 认证value,一般为用户名对应的密码
    19. .setDWSVersion("v1").build() //Linkis后台协议的版本,当前版本为v1
    20. // 2. 通过DWSClientConfig获取一个UJESClient
    21. val client = UJESClient(clientConfig)
    22. // 3. 开始执行代码
    23. val jobExecuteResult = client.execute(JobExecuteAction.builder()
    24. .setCreator("LinkisClient-Test") //creator,请求Linkis的客户端的系统名,用于做系统级隔离
    25. .addExecuteCode("show tables") //ExecutionCode 请求执行的代码
    26. .setEngineType(EngineType.SPARK) // 希望请求的Linkis的执行引擎类型,如Spark hive等
    27. .setUser("${username}").build()) //User,请求用户;用于做用户级多租户隔离
    28. println("execId: " + jobExecuteResult.getExecID + ", taskId: " + jobExecuteResult.taskID)
    29. // 4. 获取脚本的执行状态
    30. var status = client.status(jobExecuteResult)
    31. while(!status.isCompleted) {
    32. // 5. 获取脚本的执行进度
    33. val progress = client.progress(jobExecuteResult)
    34. val progressInfo = if(progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty
    35. println("progress: " + progress.getProgress + ", progressInfo: " + progressInfo)
    36. Utils.sleepQuietly(500)
    37. status = client.status(jobExecuteResult)
    38. }
    39. // 6. 获取脚本的Job信息
    40. val jobInfo = client.getJobInfo(jobExecuteResult)
    41. // 7. 获取结果集列表(如果用户一次提交多个SQL,会产生多个结果集)
    42. val resultSet = jobInfo.getResultSetList(client).head
    43. // 8. 通过一个结果集信息,获取具体的结果集
    44. val fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser).build()).getFileContent
    45. println("fileContents: " + fileContents)
    46. IOUtils.closeQuietly(client)
    47. }