JDBC 소스를 사용하여 (Py) Spark에서 데이터를 쓰고 읽는 방법은 무엇입니까?
이 질문의 목표는 다음을 문서화하는 것입니다.
PySpark에서 JDBC 연결을 사용하여 데이터를 읽고 쓰는 데 필요한 단계
JDBC 소스의 가능한 문제 및 솔루션을 알고 있습니다.
약간만 변경하면이 메서드는 Scala 및 R을 포함하여 지원되는 다른 언어에서 작동합니다.
데이터 쓰기
애플리케이션을 제출하거나 셸을 시작할 때 적용 가능한 JDBC 드라이버를 포함합니다. 예를 들어 사용할 수 있습니다
--packages
.bin/pyspark --packages group:name:version
또는 결합
driver-class-path
및jars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
이러한 속성은
PYSPARK_SUBMIT_ARGS
JVM 인스턴스가 시작되기 전에 환경 변수를 사용 하거나conf/spark-defaults.conf
설정spark.jars.packages
또는spark.jars
/를 사용 하여 설정할 수도spark.driver.extraClassPath
있습니다.원하는 모드를 선택하십시오. Spark JDBC 기록기는 다음 모드를 지원합니다.
append
:이 : class :의 내용DataFrame
을 기존 데이터에 추가합니다.overwrite
: 기존 데이터를 덮어 씁니다.ignore
: 데이터가 이미있는 경우이 작업을 자동으로 무시합니다.error
(기본 케이스) : 데이터가 이미 존재하는 경우 예외를 발생시킵니다.
Upserts 또는 기타 세분화 된 수정 은 지원되지 않습니다.
mode = ...
다음과 같이 JDBC URI를 준비합니다.
# You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"
(선택 사항) JDBC 인수 사전을 만듭니다.
properties = { "user": "foo", "password": "bar" }
properties
/options
는 지원되는 JDBC 연결 속성 을 설정하는데도 사용할 수 있습니다 .사용하다
DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
데이터를 저장합니다 (자세한 내용
pyspark.sql.DataFrameWriter
은 참조).
알려진 문제 :
--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)를 사용하여 드라이버가 포함 된 경우 적합한 드라이버를 찾을 수 없습니다.가정 추가 할 수있는이 문제를 해결 할 드라이버 버전 불일치가없는
driver
받는 클래스를properties
. 예를 들면 :properties = { ... "driver": "org.postgresql.Driver" }
사용
df.write.format("jdbc").options(...).save()
하면 다음과 같은 결과가 발생할 수 있습니다.java.lang.RuntimeException : org.apache.spark.sql.execution.datasources.jdbc.DefaultSource는 선택으로 테이블 작성을 허용하지 않습니다.
솔루션을 알 수 없습니다.
Pyspark 1.3에서는 Java 메서드를 직접 호출 할 수 있습니다.
df._jdf.insertIntoJDBC(url, "baz", True)
데이터 읽기
- 데이터 쓰기의 1-4 단계를 따릅니다.
사용
sqlContext.read.jdbc
:sqlContext.read.jdbc(url=url, table="baz", properties=properties)
또는
sqlContext.read.format("jdbc")
:(sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load())
알려진 문제 및 문제점 :
- 적합한 드라이버를 찾을 수 없습니다. 참조 : 데이터 쓰기
Spark SQL은 모든 술어를 푸시 다운 할 수는 없지만 JDBC 소스로 술어 푸시 다운을 지원합니다. 또한 제한이나 집계를 위임하지 않습니다. 가능한 해결 방법은
dbtable
/table
인수를 유효한 하위 쿼리 로 바꾸는 것 입니다. 예를 들어 :기본적으로 JDBC 데이터 소스는 단일 실행기 스레드를 사용하여 데이터를 순차적으로로드합니다. 분산 된 데이터로드를 보장하려면 다음을 수행 할 수 있습니다.
- 분할 제공
column
(해야IntegeType
),lowerBound
,upperBound
,numPartitions
. predicates
원하는 파티션마다 하나씩 상호 배타적 인 술어 목록을 제공하십시오 .
보다:
- 분할 제공
In a distributed mode (with partitioning column or predicates) each executor operates in its own transaction. If the source database is modified at the same time there is no guarantee that the final view will be consistent.
Where to find suitable drivers:
Maven Repository (to obtain required coordinates for
--packages
select desired version and copy data from a Gradle tab in a formcompile-group:name:version
substituting respective fields) or Maven Central Repository:
Other options
Depending on the database specialized source might exist, and be preferred in some cases:
- Greenplum - Pivotal Greenplum-Spark Connector
- Apache Phoenix - Apache Spark Plugin
- Microsoft SQL Server - Spark connector for Azure SQL Databases and SQL Server
- Amazon Redshift - Databricks Redshift connector (current versions available only in a proprietary Databricks Runtime. Discontinued open source version, available on GitHub).
Download mysql-connector-java driver and keep in spark jar folder,observe the bellow python code here writing data into "acotr1",we have to create acotr1 table structure in mysql database
spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="Ramyam01").load()
mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=Ramyam01"
df.write.jdbc(mysql_url,table="actor1",mode="append")
Refer this link to download the jdbc for postgres and follow the steps to download jar file
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar file will be download in the path like this. "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"
If your spark version is 2
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("sparkanalysis")
.config("spark.driver.extraClassPath",
"/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
.getOrCreate()
//for localhost database//
pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()
print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()
save the file as python and run "python respectivefilename.py"
ReferenceURL : https://stackoverflow.com/questions/30983982/how-to-use-jdbc-source-to-write-and-read-data-in-pyspark
'IT story' 카테고리의 다른 글
Azure의 정적 콘텐츠 (svg, woff, ttf)에 대한 404 (0) | 2020.12.30 |
---|---|
Akka는 Play에서 어떻게 사용 되나요? (0) | 2020.12.30 |
LINQ : 점 표기법과 쿼리 식 (0) | 2020.12.30 |
Model View Presenter 란 무엇입니까? (0) | 2020.12.30 |
Django : 기본 키를 지정하지 않고 조명기를 만드시겠습니까? (0) | 2020.12.30 |