Skip to content

Commit 1f4bc75

Browse files
author
milos.colic
committed
Base code for fetching files to local.
1 parent 6e6f74e commit 1f4bc75

4 files changed

Lines changed: 222 additions & 0 deletions

File tree

‎pom.xml‎

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>org.example</groupId>
8+
<artifactId>DeltaSharingJavaConnector</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>11</maven.compiler.source>
13+
<maven.compiler.target>11</maven.compiler.target>
14+
</properties>
15+
16+
<dependencies>
17+
<dependency>
18+
<groupId>io.delta</groupId>
19+
<artifactId>delta-sharing-spark_2.12</artifactId>
20+
<version>0.2.0</version>
21+
</dependency>
22+
<dependency>
23+
<groupId>com.fasterxml.jackson.core</groupId>
24+
<artifactId>jackson-databind</artifactId>
25+
<version>2.10.0</version>
26+
</dependency>
27+
<dependency>
28+
<groupId>org.apache.httpcomponents</groupId>
29+
<artifactId>httpclient</artifactId>
30+
<version>4.5.13</version>
31+
</dependency>
32+
<dependency>
33+
<groupId>org.apache.parquet</groupId>
34+
<artifactId>parquet-hadoop</artifactId>
35+
<version>1.12.0</version>
36+
</dependency>
37+
<dependency>
38+
<groupId>org.apache.spark</groupId>
39+
<artifactId>spark-core_2.12</artifactId>
40+
<version>3.1.1</version>
41+
<exclusions>
42+
<exclusion>
43+
<groupId>com.fasterxml.jackson.core</groupId>
44+
<artifactId>jackson-databind</artifactId>
45+
</exclusion>
46+
<exclusion>
47+
<groupId>com.fasterxml.jackson.module</groupId>
48+
<artifactId>jackson-module-scala</artifactId>
49+
</exclusion>
50+
</exclusions>
51+
</dependency>
52+
</dependencies>
53+
54+
<build>
55+
<plugins>
56+
<plugin>
57+
<groupId>org.apache.maven.plugins</groupId>
58+
<artifactId>maven-assembly-plugin</artifactId>
59+
<version>3.3.0</version>
60+
<configuration>
61+
<descriptorRefs>
62+
<descriptorRef>jar-with-dependencies</descriptorRef>
63+
</descriptorRefs>
64+
</configuration>
65+
<executions>
66+
<execution>
67+
<id>assemble-all</id>
68+
<phase>package</phase>
69+
<goals>
70+
<goal>single</goal>
71+
</goals>
72+
</execution>
73+
</executions>
74+
</plugin>
75+
</plugins>
76+
</build>
77+
78+
</project>
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.delta.sharing.java;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.delta.sharing.spark.DeltaSharingProfile;
5+
import io.delta.sharing.spark.DeltaSharingProfileProvider;
6+
7+
/**
8+
* Load [[DeltaSharingProfile]] from a JSON string.
9+
*/
10+
public class DeltaSharingJSONProvider implements DeltaSharingProfileProvider {
11+
String configuration;
12+
DeltaSharingProfile profile;
13+
14+
public DeltaSharingJSONProvider(String conf) {
15+
try {
16+
configuration = conf;
17+
ObjectMapper mapper = new ObjectMapper();
18+
DeltaSharingProfileAdaptor profileAdaptor = mapper.readValue(configuration, DeltaSharingProfileAdaptor.class);
19+
profile = profileAdaptor.toProfile();
20+
} catch (Exception e) {
21+
System.out.print(e);
22+
}
23+
if (profile.shareCredentialsVersion().isEmpty()) {
24+
throw new IllegalArgumentException(
25+
"Cannot find the 'shareCredentialsVersion' field in the profile file");
26+
}
27+
28+
if ((int) profile.shareCredentialsVersion().get() > DeltaSharingProfile.CURRENT()) {
29+
throw new IllegalArgumentException(
30+
"'shareCredentialsVersion' in the profile is " +
31+
"${profile.shareCredentialsVersion.get} which is too new. The current release " +
32+
"supports version ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer " +
33+
"release.");
34+
}
35+
if (profile.endpoint() == null) {
36+
throw new IllegalArgumentException("Cannot find the 'endpoint' field in the profile file");
37+
}
38+
if (profile.bearerToken() == null) {
39+
throw new IllegalArgumentException("Cannot find the 'bearerToken' field in the profile file");
40+
}
41+
}
42+
43+
44+
@Override
45+
public DeltaSharingProfile getProfile() {
46+
return profile;
47+
}
48+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.delta.sharing.java;
2+
3+
import io.delta.sharing.spark.DeltaSharingProfile;
4+
import scala.Option;
5+
6+
//Suppressed since getters are used only by Jackson via reflection
7+
@SuppressWarnings("unused")
8+
public class DeltaSharingProfileAdaptor {
9+
int shareCredentialsVersion;
10+
String endpoint;
11+
String bearerToken;
12+
13+
public DeltaSharingProfileAdaptor() {
14+
}
15+
16+
public int getShareCredentialsVersion() {
17+
return shareCredentialsVersion;
18+
}
19+
20+
public void setShareCredentialsVersion(int shareCredentialsVersion) {
21+
this.shareCredentialsVersion = shareCredentialsVersion;
22+
}
23+
24+
public String getEndpoint() {
25+
return endpoint;
26+
}
27+
28+
public void setEndpoint(String endpoint) {
29+
this.endpoint = endpoint;
30+
}
31+
32+
public String getBearerToken() {
33+
return bearerToken;
34+
}
35+
36+
public void setBearerToken(String bearerToken) {
37+
this.bearerToken = bearerToken;
38+
}
39+
40+
public DeltaSharingProfile toProfile() {
41+
return DeltaSharingProfile.apply(Option.apply(shareCredentialsVersion), endpoint, bearerToken);
42+
}
43+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.delta.sharing.java;
2+
3+
import io.delta.sharing.spark.DeltaSharingFileSystem;
4+
import io.delta.sharing.spark.DeltaSharingFileSystem$;
5+
import io.delta.sharing.spark.DeltaSharingProfileProvider;
6+
import io.delta.sharing.spark.DeltaSharingRestClient;
7+
import io.delta.sharing.spark.model.AddFile;
8+
import io.delta.sharing.spark.model.DeltaTableFiles;
9+
import io.delta.sharing.spark.model.Table;
10+
import org.apache.hadoop.conf.Configuration;
11+
import org.apache.hadoop.fs.FSDataInputStream;
12+
import org.apache.hadoop.fs.FSInputStream;
13+
import org.apache.hadoop.fs.Path;
14+
import org.apache.parquet.hadoop.ParquetFileReader;
15+
import org.apache.parquet.io.InputFile;
16+
import scala.collection.JavaConverters;
17+
import scala.collection.Seq;
18+
import scala.collection.Seq$;
19+
20+
import java.io.FileInputStream;
21+
import java.io.IOException;
22+
import java.net.URI;
23+
import java.nio.file.Files;
24+
import java.nio.file.Paths;
25+
import java.util.Arrays;
26+
import java.util.List;
27+
28+
29+
public class FirstTest {
30+
public static void main(String[] args) throws IOException {
31+
DeltaSharingProfileProvider profileProvider = new DeltaSharingJSONProvider("{\n" +
32+
" \"shareCredentialsVersion\": 1,\n" +
33+
" \"endpoint\": \"https://sharing.delta.io/delta-sharing/\",\n" +
34+
" \"bearerToken\": \"faaie590d541265bcab1f2de9813274bf233\"\n" +
35+
"}");
36+
DeltaSharingRestClient httpClient = new DeltaSharingRestClient(profileProvider, 120, 4, false);
37+
Seq<Table> tables = httpClient.listAllTables();
38+
List<Table> tableList = JavaConverters.seqAsJavaList(tables);
39+
for (Table table : tableList) {
40+
System.out.println(table.name());
41+
}
42+
@SuppressWarnings("unchecked") Seq<String> nil = (Seq<String>) Seq$.MODULE$.empty();
43+
DeltaTableFiles deltaTableFiles = httpClient.getFiles(tableList.get(0), nil, scala.None$.apply(0));
44+
List<AddFile> files = JavaConverters.seqAsJavaList(deltaTableFiles.files());
45+
DeltaSharingFileSystem fs = new DeltaSharingFileSystem();
46+
fs.setConf(new Configuration());
47+
for (AddFile file : files) {
48+
System.out.println(file);
49+
FSDataInputStream stream = fs.open(DeltaSharingFileSystem.createPath(URI.create(file.url()), file.size()), 1024);
50+
Files.write(Paths.get("/Users/milos.colic/IdeaProjects/DeltaSharingJavaConnector/target/" + file.id() + ".parquet"), stream.readAllBytes());
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)