New module postgresql_copy: copy data between a file and a table (#56835)

* New module postgresql_copy

* New module postgresql_copy: added tests

* New module postgresql_copy: changed tests

* New module postgresql_copy: doc format fixes

* New module postgresql_copy: fixes

* New module postgresql_copy: added upper, PostgreSQL

* New module postgresql_copy: fixed description

* New module postgresql_copy: added note about superuser

* New module postgresql_copy: remove SQLParseError

* New module postgresql_copy: fixed opt_need_quotes type

* New module postgresql_copy: fixed check_mode

* New module postgresql_copy: small fix
This commit is contained in:
Andrey Klychkov 2019-06-03 12:11:50 +03:00 committed by Will Thames
parent 0c66a5d3ca
commit 96bf243265
4 changed files with 691 additions and 4 deletions

View file

@ -0,0 +1,424 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2019, Andrew Klychkov (@Andersson007) <aaklychkov@mail.ru>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
ANSIBLE_METADATA = {
'metadata_version': '1.1',
'supported_by': 'community',
'status': ['preview']
}
DOCUMENTATION = r'''
---
module: postgresql_copy
short_description: Copy data between a file/program and a PostgreSQL table
description:
- Copy data between a file/program and a PostgreSQL table U(https://www.postgresql.org/docs/current/sql-copy.html).
version_added: '2.9'
options:
copy_to:
description:
- Copy the contents of a table to a file.
- Can also copy the results of a SELECT query.
- Mutually exclusive with I(copy_from) and I(dst).
type: path
aliases: [ to ]
copy_from:
description:
- Copy data from a file to a table (appending the data to whatever is in the table already).
- Mutually exclusive with I(copy_to) and I(src).
type: path
aliases: [ from ]
src:
description:
- Copy data from I(copy_from) to I(src=tablename).
- Used with I(copy_to) only.
type: str
aliases: [ source ]
dst:
description:
- Copy data to I(dst=tablename) from I(copy_from=/path/to/data.file).
- Used with I(copy_from) only.
type: str
aliases: [ destination ]
columns:
description:
- List of column names for the src/dst table to COPY FROM/TO.
type: list
aliases: [ column ]
program:
description:
- Mark I(src)/I(dst) as a program. Data will be copied to/from a program.
- See block Examples and PROGRAM arg description U(https://www.postgresql.org/docs/current/sql-copy.html).
type: bool
options:
description:
- Options of COPY command.
- See the full list of available options U(https://www.postgresql.org/docs/current/sql-copy.html).
type: dict
db:
description:
- Name of database to connect to.
type: str
aliases: [ login_db ]
session_role:
description:
- Switch to session_role after connecting.
The specified session_role must be a role that the current login_user is a member of.
- Permissions checking for SQL commands is carried out as though
the session_role were the one that had logged in originally.
type: str
notes:
- Supports PostgreSQL version 9.4+.
- COPY command is only allowed to database superusers.
- if I(check_mode=yes), we just check the src/dst table availability
and return the COPY query that aclually has not been executed.
- If i(check_mode=yes) and the source has been passed as SQL, the module
will execute it and rolled the transaction back but pay attention
it can affect database performance (e.g., if SQL collects a lot of data).
author:
- Andrew Klychkov (@Andersson007)
extends_documentation_fragment: postgres
'''
EXAMPLES = r'''
- name: Copy text TAB-separated data from file /tmp/data.txt to acme table
postgresql_copy:
copy_from: /tmp/data.txt
dst: acme
- name: Copy CSV (comma-separated) data from file /tmp/data.csv to columns id, name of table acme
postgresql_copy:
copy_from: /tmp/data.csv
dst: acme
columns: id,name
options:
format: csv
- name: >
Copy text vertical-bar-separated data from file /tmp/data.txt to bar table.
The NULL values are specified as N
postgresql_copy:
copy_from: /tmp/data.csv
dst: bar
options:
delimiter: '|'
null: 'N'
- name: Copy data from acme table to file /tmp/data.txt in text format, TAB-separated
postgresql_copy:
src: acme
copy_to: /tmp/data.txt
- name: Copy data from SELECT query to/tmp/data.csv in CSV format
postgresql_copy:
src: 'SELECT * FROM acme'
copy_to: /tmp/data.csv
options:
format: csv
- name: Copy CSV data from my_table to gzip
postgresql_copy:
src: my_table
copy_to: 'gzip > /tmp/data.csv.gz'
program: yes
options:
format: csv
- name: >
Copy data from columns id, name of table bar to /tmp/data.txt.
Output format is text, vertical-bar-separated, NULL as N
postgresql_copy:
src: bar
columns:
- id
- name
copy_to: /tmp/data.csv
options:
delimiter: '|'
null: 'N'
'''
RETURN = r'''
queries:
description: List of executed queries.
returned: always
type: str
sample: [ "COPY test_table FROM '/tmp/data_file.txt' (FORMAT csv, DELIMITER ',', NULL 'NULL')" ]
src:
description: Data source.
returned: always
type: str
sample: "mytable"
dst:
description: Data destination.
returned: always
type: str
sample: "/tmp/data.csv"
'''
try:
from psycopg2.extras import DictCursor
except ImportError:
# psycopg2 is checked by connect_to_db()
# from ansible.module_utils.postgres
pass
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.database import pg_quote_identifier
from ansible.module_utils.postgres import connect_to_db, postgres_common_argument_spec
from ansible.module_utils._text import to_native
from ansible.module_utils.six import iteritems
def exec_sql(obj, query, ddl=False, add_to_executed=True):
"""Execute SQL.
Auxiliary function for PostgreSQL user classes.
Returns a query result if possible or True/False if ddl=True arg was passed.
It necessary for statements that don't return any result (like DDL queries).
Arguments:
obj (obj) -- must be an object of a user class.
The object must have module (AnsibleModule class object) and
cursor (psycopg cursor object) attributes
query (str) -- SQL query to execute
ddl (bool) -- must return True or False instead of rows (typical for DDL queries)
(default False)
add_to_executed (bool) -- append the query to obj.executed_queries attribute
"""
try:
obj.cursor.execute(query)
if add_to_executed:
obj.executed_queries.append(query)
if not ddl:
res = obj.cursor.fetchall()
return res
return True
except Exception as e:
obj.module.fail_json(msg="Cannot execute SQL '%s': %s" % (query, to_native(e)))
return False
class PgCopyData(object):
"""Implements behavior of COPY FROM, COPY TO PostgreSQL command.
Arguments:
module (AnsibleModule) -- object of AnsibleModule class
cursor (cursor) -- cursor objec of psycopg2 library
Attributes:
module (AnsibleModule) -- object of AnsibleModule class
cursor (cursor) -- cursor objec of psycopg2 library
changed (bool) -- something was changed after execution or not
executed_queries (list) -- executed queries
dst (str) -- data destination table (when copy_from)
src (str) -- data source table (when copy_to)
opt_need_quotes (tuple) -- values of these options must be passed
to SQL in quotes
"""
def __init__(self, module, cursor):
self.module = module
self.cursor = cursor
self.executed_queries = []
self.changed = False
self.dst = ''
self.src = ''
self.opt_need_quotes = (
'DELIMITER',
'NULL',
'QUOTE',
'ESCAPE',
'ENCODING',
)
def copy_from(self):
"""Implements COPY FROM command behavior."""
self.src = self.module.params['copy_from']
self.dst = self.module.params['dst']
query_fragments = ['COPY %s' % pg_quote_identifier(self.dst, 'table')]
if self.module.params.get('columns'):
query_fragments.append('(%s)' % ','.join(self.module.params['columns']))
query_fragments.append('FROM')
if self.module.params.get('program'):
query_fragments.append('PROGRAM')
query_fragments.append("'%s'" % self.src)
if self.module.params.get('options'):
query_fragments.append(self.__transform_options())
# Note: check mode is implemented here:
if self.module.check_mode:
self.changed = self.__check_table(self.dst)
if self.changed:
self.executed_queries.append(' '.join(query_fragments))
else:
if exec_sql(self, ' '.join(query_fragments), ddl=True):
self.changed = True
def copy_to(self):
"""Implements COPY TO command behavior."""
self.src = self.module.params['src']
self.dst = self.module.params['copy_to']
if 'SELECT ' in self.src.upper():
# If src is SQL SELECT statement:
query_fragments = ['COPY (%s)' % self.src]
else:
# If src is a table:
query_fragments = ['COPY %s' % pg_quote_identifier(self.src, 'table')]
if self.module.params.get('columns'):
query_fragments.append('(%s)' % ','.join(self.module.params['columns']))
query_fragments.append('TO')
if self.module.params.get('program'):
query_fragments.append('PROGRAM')
query_fragments.append("'%s'" % self.dst)
if self.module.params.get('options'):
query_fragments.append(self.__transform_options())
# Note: check mode is implemented here:
if self.module.check_mode:
self.changed = self.__check_table(self.src)
if self.changed:
self.executed_queries.append(' '.join(query_fragments))
else:
if exec_sql(self, ' '.join(query_fragments), ddl=True):
self.changed = True
def __transform_options(self):
"""Transform options dict into a suitable string."""
for (key, val) in iteritems(self.module.params['options']):
if key.upper() in self.opt_need_quotes:
self.module.params['options'][key] = "'%s'" % val
opt = ['%s %s' % (key, val) for (key, val) in iteritems(self.module.params['options'])]
return '(%s)' % ', '.join(opt)
def __check_table(self, table):
"""Check table or SQL in transaction mode for check_mode.
Return True if it is OK.
Arguments:
table (str) - Table name that needs to be checked.
It can be SQL SELECT statement that was passed
instead of the table name.
"""
if 'SELECT ' in table.upper():
# In this case table is actually SQL SELECT statement.
# If SQL fails, it's handled by exec_sql():
exec_sql(self, table, add_to_executed=False)
# If exec_sql was passed, it means all is OK:
return True
exec_sql(self, 'SELECT 1 FROM %s' % pg_quote_identifier(table, 'table'),
add_to_executed=False)
# If SQL was executed successfully:
return True
# ===========================================
# Module execution.
#
def main():
argument_spec = postgres_common_argument_spec()
argument_spec.update(
copy_to=dict(type='path', aliases=['to']),
copy_from=dict(type='path', aliases=['from']),
src=dict(type='str', aliases=['source']),
dst=dict(type='str', aliases=['destination']),
columns=dict(type='list', aliases=['column']),
options=dict(type='dict'),
program=dict(type='bool'),
db=dict(type='str', aliases=['login_db']),
session_role=dict(type='str'),
)
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True,
mutually_exclusive=[
['copy_from', 'copy_to'],
['copy_from', 'src'],
['copy_to', 'dst'],
]
)
# Note: we don't need to check mutually exclusive params here, because they are
# checked automatically by AnsibleModule (mutually_exclusive=[] list above).
if module.params.get('copy_from') and not module.params.get('dst'):
module.fail_json(msg='dst param is necessary with copy_from')
elif module.params.get('copy_to') and not module.params.get('src'):
module.fail_json(msg='src param is necessary with copy_to')
# Connect to DB and make cursor object:
db_connection = connect_to_db(module, autocommit=False)
cursor = db_connection.cursor(cursor_factory=DictCursor)
##############
# Create the object and do main job:
data = PgCopyData(module, cursor)
# Note: parameters like dst, src, etc. are got
# from module object into data object of PgCopyData class.
# Therefore not need to pass args to the methods below.
# Note: check mode is implemented inside the methods below
# by checking passed module.check_mode arg.
if module.params.get('copy_to'):
data.copy_to()
elif module.params.get('copy_from'):
data.copy_from()
# Finish:
if module.check_mode:
db_connection.rollback()
else:
db_connection.commit()
cursor.close()
db_connection.close()
# Return some values:
module.exit_json(
changed=data.changed,
queries=data.executed_queries,
src=data.src,
dst=data.dst,
)
if __name__ == '__main__':
main()

View file

@ -46,10 +46,14 @@ options:
aliases: [ ssl_rootcert ]
notes:
- The default authentication assumes that you are either logging in as or sudo'ing to the C(postgres) account on the host.
- This module uses I(psycopg2), a Python PostgreSQL database adapter. You must ensure that psycopg2 is installed on
the host before using this module. If the remote host is the PostgreSQL server (which is the default case), then
PostgreSQL must also be installed on the remote host. For Ubuntu-based systems, install the C(postgresql), C(libpq-dev),
and C(python-psycopg2) packages on the remote host before using this module.
- To avoid "Peer authentication failed for user postgres" error,
use postgres user as a I(become_user).
- This module uses psycopg2, a Python PostgreSQL database adapter. You must
ensure that psycopg2 is installed on the host before using this module.
- If the remote host is the PostgreSQL server (which is the default case), then
PostgreSQL must also be installed on the remote host.
- For Ubuntu-based systems, install the postgresql, libpq-dev, and python-psycopg2 packages
on the remote host before using this module.
- The ca_cert parameter requires at least Postgres version 8.4 and I(psycopg2) version 2.4.3.
requirements: [ psycopg2 ]
'''

View file

@ -833,6 +833,10 @@
- include: test_target_role.yml
when: postgres_version_resp.stdout is version('9.1', '>=')
# Test postgresql_copy module
- include: postgresql_copy.yml
when: postgres_version_resp.stdout is version('9.4', '>=')
# Test postgresql_ext.
# pg_extension system view is available from PG 9.1.
# The tests are restricted by Fedora because there will be errors related with

View file

@ -0,0 +1,255 @@
# Copyright: (c) 2019, Andrew Klychkov (@Andersson007) <aaklychkov@mail.ru>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# The file for testing postgresql_copy module.
- vars:
test_table: acme
data_file_txt: /tmp/data.txt
data_file_csv: /tmp/data.csv
task_parameters: &task_parameters
become_user: '{{ pg_user }}'
become: True
register: result
pg_parameters: &pg_parameters
login_user: '{{ pg_user }}'
login_db: postgres
block:
# Test preparation:
- name: postgresql_copy - create test table
<<: *task_parameters
postgresql_table:
<<: *pg_parameters
name: '{{ test_table }}'
columns:
- id int
- name text
# Insert the data:
- name: postgresql_copy - insert rows into test table
<<: *task_parameters
postgresql_query:
<<: *pg_parameters
query: "INSERT INTO {{ test_table }} (id, name) VALUES (1, 'first')"
- name: postgresql_copy - ensure that test data files don't exist
<<: *task_parameters
file:
path: '{{ item }}'
state: absent
with_items:
- '{{ data_file_csv }}'
- '{{ data_file_txt }}'
# ##############
# Do main tests:
# check_mode - if it's OK, must always return changed=True:
- name: postgresql_copy - check_mode, copy test table content to data_file_txt
check_mode: yes
<<: *task_parameters
postgresql_copy:
<<: *pg_parameters
copy_to: '{{ data_file_txt }}'
src: '{{ test_table }}'
- assert:
that:
- result.changed == true
# check that nothing changed after the previous step:
- name: postgresql_copy - check that data_file_txt doesn't exist
<<: *task_parameters
ignore_errors: yes
shell: head -n 1 '{{ data_file_txt }}'
- assert:
that:
- result.failed == true
- result.rc == 1
# check_mode - if it's OK, must always return changed=True:
- name: postgresql_copy - check_mode, copy test table content from data_file_txt
check_mode: yes
<<: *task_parameters
postgresql_copy:
<<: *pg_parameters
copy_from: '{{ data_file_txt }}'
dst: '{{ test_table }}'
- assert:
that:
- result.changed == true
# check that nothing changed after the previous step:
- name: postgresql_copy - check that test table continue to have one row
<<: *task_parameters
postgresql_query:
<<: *pg_parameters
query: 'SELECT * FROM {{ test_table }}'
- assert:
that:
- result.rowcount == 1
# check_mode - test must fail because test table doesn't exist:
- name: postgresql_copy - check_mode, copy non existent table to data_file_txt
check_mode: yes
ignore_errors: yes
<<: *task_parameters
postgresql_copy:
<<: *pg_parameters
copy_to: '{{ data_file_txt }}'
src: non_existent_table
- assert:
that:
- result.failed == true
- result.queries is not defined
- name: postgresql_copy - copy test table data to data_file_txt
<<: *task_parameters
postgresql_copy:
<<: *pg_parameters
copy_to: '{{ data_file_txt }}'
src: '{{ test_table }}'
- assert:
that:
- result.changed == true
- result.queries == ["COPY \"{{ test_table }}\" TO '{{ data_file_txt }}'"]
- result.src == '{{ test_table }}'
- result.dst == '{{ data_file_txt }}'
# check the prev test
- name: postgresql_copy - check data_file_txt exists and not empty
<<: *task_parameters
shell: 'head -n 1 {{ data_file_txt }}'
- assert:
that:
- result.stdout == '1\tfirst'
# test different options and columns
- name: postgresql_copy - copy test table data to data_file_csv with options and columns
<<: *task_parameters
postgresql_copy:
<<: *pg_parameters
copy_to: '{{ data_file_csv }}'
src: '{{ test_table }}'
columns:
- id
- name
options:
format: csv
- assert:
that:
- result.changed == true
- result.queries == ["COPY \"{{ test_table }}\" (id,name) TO '{{ data_file_csv }}' (format csv)"]
- result.src == '{{ test_table }}'
- result.dst == '{{ data_file_csv }}'
# check the prev test
- name: postgresql_copy - check data_file_csv exists and not empty
<<: *task_parameters
shell: 'head -n 1 {{ data_file_csv }}'
- assert:
that:
- result.stdout == '1,first'
- name: postgresql_copy - copy from data_file_csv to test table
<<: *task_parameters
postgresql_copy:
<<: *pg_parameters
copy_from: '{{ data_file_csv }}'
dst: '{{ test_table }}'
columns:
- id
- name
options:
format: csv
- assert:
that:
- result.changed == true
- result.queries == ["COPY \"{{ test_table }}\" (id,name) FROM '{{ data_file_csv }}' (format csv)"]
- result.dst == '{{ test_table }}'
- result.src == '{{ data_file_csv }}'
- name: postgresql_copy - check that there are two rows in test table after the prev step
<<: *task_parameters
postgresql_query:
<<: *pg_parameters
query: "SELECT * FROM {{ test_table }} WHERE id = '1' AND name = 'first'"
- assert:
that:
- result.rowcount == 2
- name: postgresql_copy - test program option, copy to program
<<: *task_parameters
postgresql_copy:
<<: *pg_parameters
src: '{{ test_table }}'
copy_to: '/bin/true'
program: yes
columns: id, name
options:
delimiter: '|'
when: ansible_distribution != 'FreeBSD'
- assert:
that:
- result.changed == true
- result.queries == ["COPY \"{{ test_table }}\" (id, name) TO PROGRAM '/bin/true' (delimiter '|')"]
- result.src == '{{ test_table }}'
- result.dst == '/bin/true'
when: ansible_distribution != 'FreeBSD'
- name: postgresql_copy - test program option, copy from program
<<: *task_parameters
postgresql_copy:
<<: *pg_parameters
dst: '{{ test_table }}'
copy_from: 'echo 1,first'
program: yes
columns: id, name
options:
delimiter: ','
- assert:
that:
- result.changed == true
- result.queries == ["COPY \"{{ test_table }}\" (id, name) FROM PROGRAM 'echo 1,first' (delimiter ',')"]
- result.dst == '{{ test_table }}'
- result.src == 'echo 1,first'
when: ansible_distribution != 'FreeBSD'
- name: postgresql_copy - check that there are three rows in test table after the prev step
<<: *task_parameters
postgresql_query:
<<: *pg_parameters
query: "SELECT * FROM {{ test_table }} WHERE id = '1' AND name = 'first'"
- assert:
that:
- result.rowcount == 3
# clean up
- name: postgresql_copy - remove test table
<<: *task_parameters
postgresql_table:
<<: *pg_parameters
name: '{{ test_table }}'
state: absent
- name: postgresql_copy - remove test data files
<<: *task_parameters
file:
path: '{{ item }}'
state: absent
with_items:
- '{{ data_file_csv }}'
- '{{ data_file_txt }}'