软件开发定制定制【Spark】spark对mysql的操作

目录


一、前言

        软件开发定制定制使用技术和mysql软件开发定制定制交互的时候往往会遇到软件开发定制定制以下几种情况,需要编写不同的api方式来满足开发过程中的不同需求,这里使用的语言为scala变成语言;

  • 读取mysql满足条件的行记录
  • 整体写入mysql的操作
  • 更新mysql的某行记录

二、使用技巧

1、读取mysql满足条件的行记录

  • 首先需要初始化SparkSession对象,这里比较常用通过连接hive的api获取、同理其他方式获取也可以;

        连接hive获取:

  1. //conHive方法在DBConUtil类中;
  2. def conHive(appName:String):SparkSession={
  3. SparkSession.builder()
  4. //.master("local[2]")
  5. .appName(appName)
  6. .config("spark.sql.broadcastTimeout","36000")
  7. // .config("spark.default.parallelism",1000)
  8. .config("hive.exec.dynamici.partition", true)
  9. .config("hive.exec.dynamic.partition.mode", "nonstrict")
  10. .enableHiveSupport()
  11. .getOrCreate()
  12. }
  13. val spark: SparkSession = DBConUtil.conHive("test")

        其他方式获取:

  1. val spark: SparkSession = SparkSession
  2. .builder()
  3. .appName("test")
  4. .master("local[*]")
  5. .getOrCreate()
  • 然后使用初始化好的SparkSession对象进行mysql数据库数据的读取操作;
  1. val properties = new Properties()
  2. properties.setProperty("user","mysqldb")
  3. properties.setProperty("password","pwd")
  4. val url="jdbc:mysql://ip:3306/test?characterEncoding=utf8&useSSL=true"
  5. var df= spark.read.jdbc(url,"table1",properties)
  6. .select("name","age","sex")
  7. .where("age>20")

2、整体写入mysql的操作

        这里的整体写入mysql的操作的含义是将条件筛选之后的DataFram或着DataSet直接写入mysql,调用的是spark官方提供的api。所以首先要创建出来一个DataFram或者DataSet数据集,接下来就是直接写入;

  1. val properties = new Properties()
  2. properties.setProperty("user", mysqlUser)
  3. properties.setProperty("password", mysqlPwd)
  4. df.repartition(80).write.mode(SaveMode.Append).option("driver","com.mysql.jdbc.Driver")
  5. .jdbc(mysqlUrl, mysqlRetTable, properties)

存储模式主要包含如下几种:

  1. SaveMode.ErrorIfExists【默认】模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库;
  2. SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
  3. SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;
  4. SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。

3、更新mysql的某行记录

        有时候在写spark程序的时候需要对mysql中的单行或者多行的某些字段进行更新操作,spark api并没有提供这些操作,这里需要自己写原生的JDBC操作更新或者批量更新mysql记录;

  1. val connection: Connection = JdbcTemplateUtil.
  2. getConnection("jdbc:mysql://ip:3306/url_analyse?characterEncoding=utf8&useSSL=false",
  3. "mysqldb", "pwd")
  4. JdbcTemplateUtil.executeSql(connection,"insert into test01(id,test) values(?,?)",Array("117","aa"))
  5. //批量插入
  6. // var arrayBuffer = new ArrayBuffer[Array[String]]()
  7. // arrayBuffer += Array("220","bb")
  8. // arrayBuffer += Array("330","cc")
  9. // arrayBuffer += Array("440","dd")
  10. // JdbcTemplateUtil.executeBatchSql(connection,"insert into test01(id,test) values(?,?)",arrayBuffer)
  1. import com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
  2. import java.sql.{Connection, DriverManager}
  3. import scala.collection.mutable.ArrayBuffer
  4. object JdbcTemplateUtil {
  5. /**
  6. * 单条操作
  7. * @param sql
  8. * @param params
  9. */
  10. def executeSql(conn: Connection, sql: String, params: Array[String]): Unit = {
  11. try {
  12. val ps = conn.prepareStatement(sql)
  13. if (params != null) {
  14. for (i <- params.indices)
  15. ps.setString(i + 1, params(i))
  16. }
  17. val update = ps.executeUpdate()
  18. ps.close()
  19. } catch {
  20. case e: Exception => println(">>>Execute Sql Exception..." + e)
  21. }
  22. }
  23. /**
  24. * 批量操作
  25. * @param sql
  26. * @param paramList
  27. */
  28. def executeBatchSql(conn: Connection, sql: String, paramList: ArrayBuffer[Array[String]]): Unit = {
  29. try {
  30. val ps = conn.prepareStatement(sql)
  31. conn.setAutoCommit(false)
  32. for (params: Array[String] <- paramList) {
  33. if (params != null) {
  34. for (i <- params.indices) ps.setString(i + 1, params(i))
  35. ps.addBatch()
  36. }
  37. }
  38. ps.executeBatch()
  39. conn.commit()
  40. ps.close()
  41. conn.close()
  42. } catch {
  43. case e: Exception => println(">>>Execute Batch Sql Exception..." + e)
  44. }
  45. }
  46. /**
  47. * 获取mysql连接
  48. * @param url
  49. * @param user
  50. * @param pwd
  51. * @return
  52. */
  53. def getConnection(url:String,user:String,pwd:String):Connection={
  54. //classOf[com.mysql.cj.jdbc.Driver]
  55. Class.forName("com.mysql.jdbc.Driver")
  56. DriverManager.getConnection(url,user,pwd)
  57. }
  58. }

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发