Java库为Postgres编写二进制格式COPY?

有没有人遇到过Java库(或只是一些代码)来编写Postgres的COPY命令所使用的binary格式?

它看起来很简单,但如果有人已经找到了正确的元组数据格式,我也可以从那里开始。

实际上,即使只是描述所有数据类型的格式也会有所帮助。

谢谢。

您可以尝试PgBulkInsert ,它实现了PostgreSQL的二进制复制协议:

它也可以从Maven Central Repository获得。

免责声明:我是项目作者。

PostgreSQL二进制复制协议

我不想简单地宣传我的项目,而是写下协议。

首先,我编写了一个类PgBinaryWriter ,它包装了一个DataOutputStream并具有编写二进制协议头的方法,这是一种启动新行的方法(二进制复制协议要求您为每行写入列数)插入)和一个write方法,它采用IValueHandler来编写给定的Java类型。

PgBinaryWriter实现了AutoClosable ,因为在刷新和关闭流之前必须向流写入-1

IValueHandler接受DataOutputStream和值。 它负责使用PostgreSQL二进制协议格式编写给定值。

PgBinaryWriter

 // Copyright (c) Philipp Wagner. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql; import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException; import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers.IValueHandler; import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.OutputStream; public class PgBinaryWriter implements AutoCloseable { /** The ByteBuffer to write the output. */ private transient DataOutputStream buffer; public PgBinaryWriter() { } public void open(final OutputStream out) { buffer = new DataOutputStream(new BufferedOutputStream(out)); writeHeader(); } private void writeHeader() { try { // 11 bytes required header buffer.writeBytes("PGCOPY\n\377\r\n\0"); // 32 bit integer indicating no OID buffer.writeInt(0); // 32 bit header extension area length buffer.writeInt(0); } catch(Exception e) { throw new BinaryWriteFailedException(e); } } public void startRow(int numColumns) { try { buffer.writeShort(numColumns); } catch(Exception e) { throw new BinaryWriteFailedException(e); } } public  void write(final IValueHandler handler, final TTargetType value) { handler.handle(buffer, value); } @Override public void close() { try { buffer.writeShort(-1); buffer.flush(); buffer.close(); } catch(Exception e) { throw new BinaryWriteFailedException(e); } } } 

ValueHandler

IValueHandler是一个简单的接口,它有一个handle方法来获取DataOutputStream和一个值。

 // Copyright (c) Philipp Wagner. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers; import java.io.DataOutputStream; import java.lang.reflect.Type; public interface IValueHandler extends ValueHandler { void handle(DataOutputStream buffer, final TTargetType value); Type getTargetType(); } 

了解协议非常重要,当值为null时,必须写入-1 。 为此,我编写了一个处理案例的抽象基类。

 // Copyright (c) Philipp Wagner. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers; import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException; import java.io.DataOutputStream; public abstract class BaseValueHandler implements IValueHandler { @Override public void handle(DataOutputStream buffer, final T value) { try { if (value == null) { buffer.writeInt(-1); return; } internalHandle(buffer, value); } catch (Exception e) { throw new BinaryWriteFailedException(e); } } protected abstract void internalHandle(DataOutputStream buffer, final T value) throws Exception; } 

然后可以实现各种Java类型的处理程序。 这是long的例子。 您可以在GitHub存储库( 处理程序 )中找到其他实现。

 // Copyright (c) Philipp Wagner. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers; import java.io.DataOutputStream; import java.lang.reflect.Type; public class LongValueHandler extends BaseValueHandler { @Override protected void internalHandle(DataOutputStream buffer, final Long value) throws Exception { buffer.writeInt(8); buffer.writeLong(value); } @Override public Type getTargetType() { return Long.class; } } 

使用PgBinaryWriter

现在终于连接部件了。 请注意,我已经抽象了更多的部分。 可能有必要在代码中查找更多实现细节。

 public abstract class PgBulkInsert { // ... public void saveAll(PGConnection connection, Stream entities) throws SQLException { CopyManager cpManager = connection.getCopyAPI(); CopyIn copyIn = cpManager.copyIn(getCopyCommand()); int columnCount = columns.size(); try (PgBinaryWriter bw = new PgBinaryWriter()) { // Wrap the CopyOutputStream in our own Writer: bw.open(new PGCopyOutputStream(copyIn)); // Insert all entities: entities.forEach(entity -> { // Start a New Row: bw.startRow(columnCount); // Insert the Column Data: columns.forEach(column -> { try { column.getWrite().invoke(bw, entity); } catch (Exception e) { throw new SaveEntityFailedException(e); } }); }); } } private String getCopyCommand() { String commaSeparatedColumns = columns.stream() .map(x -> x.columnName) .collect(Collectors.joining(", ")); return String.format("COPY %1$s(%2$s) FROM STDIN BINARY", table.GetFullQualifiedTableName(), commaSeparatedColumns); } } 

PgBulkInsert

PgBulkInsert支持以下PostgreSQL数据类型。

  • 数字类型
    • SMALLINT
    • 整数
    • BIGINT
    • 真实
    • 双精度
  • 日期/时间类型
    • 时间戳
    • 日期
  • 角色类型
    • 文本
  • 布尔类型
    • 布尔
  • 二进制数据类型
    • BYTEA
  • 网络地址类型
    • inet(IPv4,IPv6)
  • UUID类型
    • UUID

基本用法

想象一下,应该将大量人员批量插入PostgreSQL数据库。 每个Person都有一个名字,一个姓氏和一个生日。

数据库表

PostgreSQL数据库中的表可能如下所示:

 CREATE TABLE sample.unit_test ( first_name text, last_name text, birth_date date ); 

领域模型

应用程序中的域模型可能如下所示:

 private class Person { private String firstName; private String lastName; private LocalDate birthDate; public Person() {} public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public LocalDate getBirthDate() { return birthDate; } public void setBirthDate(LocalDate birthDate) { this.birthDate = birthDate; } } 

批量插入器

然后,您必须实现PgBulkInsert ,它定义表和域模型之间的映射。

 public class PersonBulkInserter extends PgBulkInsert { public PersonBulkInserter() { super("sample", "unit_test"); MapString("first_name", Person::getFirstName); MapString("last_name", Person::getLastName); MapDate("birth_date", Person::getBirthDate); } } 

使用批量插入器

最后我们可以编写一个unit testing来将100000个人插入到数据库中。 您可以在GitHub上找到整个unit testing: IntegrationTest.java 。

 @Test public void bulkInsertPersonDataTest() throws SQLException { // Create a large list of Persons: List persons = getPersonList(100000); // Create the BulkInserter: PersonBulkInserter personBulkInserter = new PersonBulkInserter(); // Now save all entities of a given stream: personBulkInserter.saveAll(PostgreSqlUtils.getPGConnection(connection), persons.stream()); // And assert all have been written to the database: Assert.assertEquals(100000, getRowCount()); } private List getPersonList(int numPersons) { List persons = new ArrayList<>(); for (int pos = 0; pos < numPersons; pos++) { Person p = new Person(); p.setFirstName("Philipp"); p.setLastName("Wagner"); p.setBirthDate(LocalDate.of(1986, 5, 12)); persons.add(p); } return persons; } 

您是否考虑过使用JDBC驱动程序中的CopyManager ? 否则,您可以从QueryExecutorImpl派生实现。