Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## 8.1.13 - 2025-11-13
### Changed
- Upgrade maven shade plugin to `3.6.1` (was `3.1.1`).
- Upgrade AWS Java SDK in `apiary-gluesync-listener` to `1.12.792` (was `1.12.276`)
- Send view original texts in `apiary-gluesync-listener`.
- Add possibility to only sync views in `apiary-gluesync-listener` CLI/
- Shade micrometer libraries into glue-sync CLI.

## 8.1.12 - 2025-09-15
### Added
- GlueSyncCli with fat jar for on-demand syncing.
Expand Down
19 changes: 18 additions & 1 deletion hive-event-listeners/apiary-gluesync-listener/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
<artifactId>apiary-gluesync-listener</artifactId>
<name>Apiary GlueSync Listener</name>
<description>Glue Sync Listener for Apiary that replays metadata events in AWS Glue catalog</description>

<properties>
<aws-java-sdk.version>1.12.792</aws-java-sdk.version>
</properties>

<dependencies>
<dependency>
<groupId>com.expediagroup.apiary</groupId>
Expand Down Expand Up @@ -137,7 +142,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<version>3.6.1</version>
<executions>
<execution>
<id>shade-all</id>
Expand All @@ -153,6 +158,8 @@
<include>com.amazonaws:*</include>
<include>com.expediagroup.apiary:*</include>
<include>org.apache.httpcomponents:*</include>
<include>io.micrometer:*</include>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relocate as well

<include>io.prometheus:*</include>
</includes>
<excludes>
<exclude>ch.qos.reload4j:reload4j</exclude>
Expand All @@ -170,6 +177,16 @@
<pattern>org.apache.http</pattern>
<shadedPattern>${shade.prefix}.org.apache.http</shadedPattern>
</relocation>
<!-- Micrometer and prometheus are not part of Hive Metastore libraries.
Ensuring version take precedence -->
<relocation>
<pattern>io.micrometer</pattern>
<shadedPattern>${shade.prefix}.io.micrometer</shadedPattern>
</relocation>
<relocation>
<pattern>io.prometheus</pattern>
<shadedPattern>${shade.prefix}.io.prometheus</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.expediagroup.apiary.extensions.gluesync.cli;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

Expand Down Expand Up @@ -47,6 +48,7 @@
import com.hotels.hcommon.hive.metastore.iterator.PartitionIterator;

public class GlueSyncCli {

private static final Logger logger = LoggerFactory.getLogger(GlueSyncCli.class);

private static final String THRIFT_CONNECTION_URI = System.getenv("THRIFT_CONNECTION_URI");
Expand Down Expand Up @@ -109,8 +111,11 @@ public void syncAll(CommandLine cmd) throws TException {
boolean continueOnError = cmd.hasOption("continueOnError");
boolean deleteGluePartitions = !cmd.hasOption("keep-glue-partitions");

logger.debug("Additional parameters: continueOnError={}, deleteGluePartitions={}", continueOnError,
deleteGluePartitions);
String syncTypesFlag = cmd.getOptionValue("sync-types");
List<String> syncTypes = syncTypesFlag == null ? null : Arrays.asList(syncTypesFlag.split(","));

logger.debug("Additional parameters: continueOnError={}, deleteGluePartitions={}, syncTypes={}",
continueOnError, deleteGluePartitions, syncTypesFlag);

boolean hadError = false;
for (String dbName : metastoreClient.getAllDatabases()) {
Expand All @@ -120,7 +125,7 @@ public void syncAll(CommandLine cmd) throws TException {
if (tableName.matches(tableRegex)) {
try {
logger.info("Syncing table: {} in database: {}", tableName, dbName);
syncTable(dbName, tableName, deleteGluePartitions, verbose);
syncTable(dbName, tableName, deleteGluePartitions, syncTypes, verbose);
} catch (Exception e) {
hadError = true;
logger.error("Error syncing table: {} in database: {}: {}", tableName, dbName, e.getMessage());
Expand All @@ -139,8 +144,8 @@ public void syncAll(CommandLine cmd) throws TException {
}
}

private void syncTable(String dbName, String tableName, boolean deleteGluePartitions, boolean verbose)
throws TException {
private void syncTable(String dbName, String tableName, boolean deleteGluePartitions, List<String> syncTypes,
boolean verbose) throws TException {
Database database = metastoreClient.getDatabase(dbName);

if (!glueDatabaseService.exists(database)) {
Expand All @@ -150,6 +155,14 @@ private void syncTable(String dbName, String tableName, boolean deleteGluePartit

Table table = metastoreClient.getTable(dbName, tableName);

if (syncTypes != null) {
String type = table.getTableType();
if (type != null && !syncTypes.contains(type)) {
logger.info("Table {}.{} is {}, skipping as syncTypes flag {} is active", dbName, tableName, type, syncTypes);
return;
}
}

CreateTableEvent createTableEvent = new CreateTableEvent(table, true, null);
apiaryGlueSync.onCreateTable(createTableEvent);

Expand All @@ -176,5 +189,4 @@ protected Iterator<Partition> createPartitionIterator(IMetaStoreClient metastore
throws org.apache.hadoop.hive.metastore.api.MetaException, org.apache.thrift.TException {
return new PartitionIterator(metastoreClient, table, DEFAULT_PARTITION_BATCH_SIZE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ private static CommandLineParser getParser(Options options) {
options.addOption(new Option("c", "continueOnError", false, "Continue on error (default: false)"));
options.addOption(new Option(null, "keep-glue-partitions", false,
"If true, will keep glue partitions even if there is no corresponding hive partition. If false will delete them (default: false)"));
options.addOption(new Option(null, "sync-types", true,
"List of table types to sync. If non specified it will be sync all table types. Example: sync-types=MANAGED_TABLE,EXTERNAL_TABLE. Possible types: VIRTUAL_VIEW,MANAGED_TABLE,EXTERNAL_TABLE"));

CommandLineParser parser = new DefaultParser();
return parser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ public TableInput transformTable(final Table table) {
.withPartitionKeys(partitionKeys)
.withRetention(table.getRetention())
.withStorageDescriptor(sd)
.withTableType(table.getTableType());
.withTableType(table.getTableType())
.withViewOriginalText(table.getViewOriginalText())
.withViewExpandedText(table.getViewExpandedText());
}

public PartitionInput transformPartition(final Partition partition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ private Options createOptions() {
options.addOption(new Option("h", "help", false, "Print usage information"));
options.addOption(new Option(null, "continueOnError", false, "Continue processing on errors"));
options.addOption(new Option(null, "keep-glue-partitions", false, "Keep existing Glue partitions"));
options.addOption(new Option(null, "sync-types", true, "Choose what table type to sync."));

return options;
}
Expand Down Expand Up @@ -170,7 +171,6 @@ public void testSyncAllWithMatchingDatabaseAndTable() throws Exception {
verify(mockApiaryGlueSync, never()).onCreateDatabase(any(CreateDatabaseEvent.class));
verify(mockApiaryGlueSync, times(4)).onCreateTable(any(CreateTableEvent.class));
verify(mockGluePartitionService, times(4)).synchronizePartitions(any(), any(), anyBoolean(), anyBoolean());

}

@Test
Expand Down Expand Up @@ -209,7 +209,6 @@ public void testSyncAllWithVerboseOption() throws Exception {
verify(mockApiaryGlueSync, never()).onCreateDatabase(any(CreateDatabaseEvent.class));
verify(mockApiaryGlueSync, times(1)).onCreateTable(any(CreateTableEvent.class));
verify(mockGluePartitionService, times(1)).synchronizePartitions(any(), any(), anyBoolean(), eq(true));

}

@Test
Expand Down Expand Up @@ -262,7 +261,6 @@ public void testSyncAllWithNoMatchingDatabases() throws Exception {
// Assert
verify(mockApiaryGlueSync, never()).onCreateDatabase(any(CreateDatabaseEvent.class));
verify(mockApiaryGlueSync, never()).onCreateTable(any(CreateTableEvent.class));

}

@Test(expected = RuntimeException.class)
Expand Down Expand Up @@ -339,7 +337,7 @@ public void testSyncAllWithKeepGluePartitionsOption() throws Exception {
// Arrange - Test that deleteGluePartitions is false when keep-glue-partitions
// is set
String[] args = { "--database-name-regex", "test_db.*", "--table-name-regex", "test_table.*",
"--keep-glue-partitions" };
"--keep-glue-partitions" };
CommandLine cmd = new DefaultParser().parse(createOptions(), args);

List<String> databases = Arrays.asList("test_db1");
Expand Down Expand Up @@ -369,6 +367,104 @@ public void testSyncAllWithKeepGluePartitionsOption() throws Exception {
verify(mockGluePartitionService, times(1)).synchronizePartitions(any(), any(), eq(false), anyBoolean());
}

@Test
public void testSyncAllWithSyncOnlyViewsOption_onView() throws Exception {
String[] args = { "--database-name-regex", "test_db.*", "--table-name-regex", "test_table.*",
"--sync-types", "VIRTUAL_VIEW" };
CommandLine cmd = new DefaultParser().parse(createOptions(), args);

List<String> databases = Arrays.asList("test_db1");
List<String> tables = Arrays.asList("test_table1");

Database mockDatabase = new Database();
mockDatabase.setName("test_db1");

Table mockTable = new Table();
mockTable.setDbName("test_db1");
mockTable.setTableName("test_table1");
mockTable.setTableType("VIRTUAL_VIEW");
Map<String, String> tableParams = new HashMap<>();
mockTable.setParameters(tableParams);

when(mockMetastoreClient.getAllDatabases()).thenReturn(databases);
when(mockMetastoreClient.getAllTables("test_db1")).thenReturn(tables);
when(mockMetastoreClient.getDatabase(anyString())).thenReturn(mockDatabase);
when(mockMetastoreClient.getTable(anyString(), anyString())).thenReturn(mockTable);
when(mockIsIcebergTablePredicate.test(any())).thenReturn(false);
when(mockMetastoreClient.listPartitions(anyString(), anyString(), anyShort())).thenReturn(Arrays.asList());

// Act
glueSyncCli.syncAll(cmd);

// Assert - Verify that synchronizePartitions is called with
// deleteGluePartitions=false
verify(mockGluePartitionService, times(1)).synchronizePartitions(any(), any(), anyBoolean(), anyBoolean());
}

@Test
public void testSyncAllWithSyncOnlyViewsOption_onNormalTable() throws Exception {
String[] args = { "--database-name-regex", "test_db.*", "--table-name-regex", "test_table.*",
"--sync-types", "VIRTUAL_VIEW" };
CommandLine cmd = new DefaultParser().parse(createOptions(), args);

List<String> databases = Arrays.asList("test_db1");
List<String> tables = Arrays.asList("test_table1");

Database mockDatabase = new Database();
mockDatabase.setName("test_db1");

Table mockTable = new Table();
mockTable.setDbName("test_db1");
mockTable.setTableName("test_table1");
mockTable.setTableType("EXTERNAL_TABLE");
Map<String, String> tableParams = new HashMap<>();
mockTable.setParameters(tableParams);

when(mockMetastoreClient.getAllDatabases()).thenReturn(databases);
when(mockMetastoreClient.getAllTables("test_db1")).thenReturn(tables);
when(mockMetastoreClient.getDatabase(anyString())).thenReturn(mockDatabase);
when(mockMetastoreClient.getTable(anyString(), anyString())).thenReturn(mockTable);

// Act
glueSyncCli.syncAll(cmd);

verify(mockGluePartitionService, times(0)).synchronizePartitions(any(), any(), anyBoolean(), anyBoolean());
}

@Test
public void testSyncAllWithSyncOnlyViewsOption_onViewAndTable() throws Exception {
String[] args = { "--database-name-regex", "test_db.*", "--table-name-regex", "test_table.*",
"--sync-types", "VIRTUAL_VIEW,EXTERNAL_TABLE" };
CommandLine cmd = new DefaultParser().parse(createOptions(), args);

List<String> databases = Arrays.asList("test_db1");
List<String> tables = Arrays.asList("test_table1");

Database mockDatabase = new Database();
mockDatabase.setName("test_db1");

Table mockTable = new Table();
mockTable.setDbName("test_db1");
mockTable.setTableName("test_table1");
mockTable.setTableType("VIRTUAL_VIEW");
Map<String, String> tableParams = new HashMap<>();
mockTable.setParameters(tableParams);

when(mockMetastoreClient.getAllDatabases()).thenReturn(databases);
when(mockMetastoreClient.getAllTables("test_db1")).thenReturn(tables);
when(mockMetastoreClient.getDatabase(anyString())).thenReturn(mockDatabase);
when(mockMetastoreClient.getTable(anyString(), anyString())).thenReturn(mockTable);
when(mockIsIcebergTablePredicate.test(any())).thenReturn(false);
when(mockMetastoreClient.listPartitions(anyString(), anyString(), anyShort())).thenReturn(Arrays.asList());

// Act
glueSyncCli.syncAll(cmd);

// Assert - Verify that synchronizePartitions is called with
// deleteGluePartitions=false
verify(mockGluePartitionService, times(1)).synchronizePartitions(any(), any(), anyBoolean(), anyBoolean());
}

@Test
public void testSyncAllWithDefaultPartitionDeletion() throws Exception {
// Arrange - Test that deleteGluePartitions is true by default (when
Expand Down Expand Up @@ -443,7 +539,6 @@ public void testSyncAllWithLargePartitionBatchHandling() throws Exception {
// Assert
verify(mockGluePartitionService, times(1)).synchronizePartitions(any(), eq(largePartitionList), anyBoolean(),
eq(true));

}

@Test(expected = RuntimeException.class)
Expand Down Expand Up @@ -498,7 +593,6 @@ public void testSyncAllWithNonMatchingTableRegex() throws Exception {
verify(mockApiaryGlueSync, never()).onCreateDatabase(any(CreateDatabaseEvent.class));
verify(mockApiaryGlueSync, never()).onCreateTable(any(CreateTableEvent.class));
verify(mockGluePartitionService, never()).synchronizePartitions(any(), any(), anyBoolean(), anyBoolean());

}

@Test
Expand All @@ -516,7 +610,6 @@ public void testSyncAllWithEmptyDatabaseList() throws Exception {
verify(mockApiaryGlueSync, never()).onCreateDatabase(any(CreateDatabaseEvent.class));
verify(mockApiaryGlueSync, never()).onCreateTable(any(CreateTableEvent.class));
verify(mockGluePartitionService, never()).synchronizePartitions(any(), any(), anyBoolean(), anyBoolean());

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,30 @@ public void onCreateIcebergTable() throws MetaException {
assertThat(toList(createTableRequest.getTableInput().getStorageDescriptor().getColumns()), is(asList(colNames)));
}

@Test
public void onCreateHiveView() throws MetaException {
CreateTableEvent event = mock(CreateTableEvent.class);
when(event.getStatus()).thenReturn(true);

Table table = simpleHiveTable(simpleSchema(), Collections.emptyList());
table.setTableType("VIRTUAL_VIEW");
table.setViewOriginalText("SELECT * FROM some_table");
table.setViewExpandedText("SELECT * FROM some_table");
when(event.getTable()).thenReturn(table);

glueSync.onCreateTable(event);

verify(glueClient).createTable(createTableRequestCaptor.capture());
verify(metricService).incrementCounter(MetricConstants.LISTENER_TABLE_SUCCESS);
CreateTableRequest createTableRequest = createTableRequestCaptor.getValue();

assertThat(createTableRequest.getDatabaseName(), is(gluePrefix + dbName));
assertThat(createTableRequest.getTableInput().getName(), is(tableName));
assertThat(createTableRequest.getTableInput().getTableType(), is("VIRTUAL_VIEW"));
assertThat(createTableRequest.getTableInput().getViewOriginalText().isEmpty(), is(false));
assertThat(createTableRequest.getTableInput().getViewExpandedText().isEmpty(), is(false));
}

@Test
public void onAlterHiveTable() throws MetaException {
AlterTableEvent event = mock(AlterTableEvent.class);
Expand Down
Loading