diff --git a/data/Dockerfiles/dovecot/Dockerfile b/data/Dockerfiles/dovecot/Dockerfile index f1152c8a1..9e32d775e 100644 --- a/data/Dockerfiles/dovecot/Dockerfile +++ b/data/Dockerfiles/dovecot/Dockerfile @@ -69,6 +69,8 @@ RUN addgroup -g 5000 vmail \ perl-par-packer \ perl-parse-recdescent \ perl-lockfile-simple \ + perl-parallel-forkmanager \ + perl-redis \ libproc2 \ perl-readonly \ perl-regexp-common \ diff --git a/data/Dockerfiles/dovecot/imapsync_runner.pl b/data/Dockerfiles/dovecot/imapsync_runner.pl index 1030603ce..07341f748 100644 --- a/data/Dockerfiles/dovecot/imapsync_runner.pl +++ b/data/Dockerfiles/dovecot/imapsync_runner.pl @@ -2,21 +2,15 @@ use DBI; use LockFile::Simple qw(lock trylock unlock); -use Proc::ProcessTable; use Data::Dumper qw(Dumper); use IPC::Run 'run'; use File::Temp; use Try::Tiny; +use Parallel::ForkManager; +use Redis; use sigtrap 'handler' => \&sig_handler, qw(INT TERM KILL QUIT); sub trim { my $s = shift; $s =~ s/^\s+|\s+$//g; return $s }; -my $t = Proc::ProcessTable->new; -my $imapsync_running = grep { $_->{cmndline} =~ /imapsync\s/i } @{$t->table}; -if ($imapsync_running ge 1) -{ - print "imapsync is active, exiting..."; - exit; -} sub qqw($) { my @params = (); @@ -35,93 +29,51 @@ sub qqw($) { return @params; } -$run_dir="/tmp"; -$dsn = 'DBI:mysql:database=' . $ENV{'DBNAME'} . ';mysql_socket=/var/run/mysqld/mysqld.sock'; -$lock_file = $run_dir . "/imapsync_busy"; -$lockmgr = LockFile::Simple->make(-autoclean => 1, -max => 1); -$lockmgr->lock($lock_file) || die "can't lock ${lock_file}"; -$dbh = DBI->connect($dsn, $ENV{'DBUSER'}, $ENV{'DBPASS'}, { - mysql_auto_reconnect => 1, - mysql_enable_utf8mb4 => 1 -}); -$dbh->do("UPDATE imapsync SET is_running = 0"); +our $pm; sub sig_handler { - # Send die to force exception in "run" + if (defined $pm) { + foreach my $child_pid (keys %{ $pm->{processes} }) { + kill 'TERM', $child_pid; + } + } die "sig_handler received signal, preparing to exit...\n"; }; -open my $file, '<', "/etc/sogo/sieve.creds"; -my $creds = <$file>; -close $file; -my ($master_user, $master_pass) = split /:/, $creds; -my $sth = $dbh->prepare("SELECT id, - user1, - user2, - host1, - authmech1, - password1, - exclude, - port1, - enc1, - delete2duplicates, - maxage, - subfolder2, - delete1, - delete2, - automap, - skipcrossduplicates, - maxbytespersecond, - custom_params, - subscribeall, - timeout1, - timeout2, - dry - FROM imapsync - WHERE active = 1 - AND is_running = 0 - AND ( - UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(last_run) > mins_interval * 60 - OR - last_run IS NULL) - ORDER BY last_run"); +sub run_one_job { + my ($dbh, $row_ref, $master_user, $master_pass) = @_; -$sth->execute(); -my $row; - -while ($row = $sth->fetchrow_arrayref()) { - - $id = @$row[0]; - $user1 = @$row[1]; - $user2 = @$row[2]; - $host1 = @$row[3]; - $authmech1 = @$row[4]; - $password1 = @$row[5]; - $exclude = @$row[6]; - $port1 = @$row[7]; - $enc1 = @$row[8]; - $delete2duplicates = @$row[9]; - $maxage = @$row[10]; - $subfolder2 = @$row[11]; - $delete1 = @$row[12]; - $delete2 = @$row[13]; - $automap = @$row[14]; - $skipcrossduplicates = @$row[15]; - $maxbytespersecond = @$row[16]; - $custom_params = @$row[17]; - $subscribeall = @$row[18]; - $timeout1 = @$row[19]; - $timeout2 = @$row[20]; - $dry = @$row[21]; + my $id = $row_ref->[0]; + my $user1 = $row_ref->[1]; + my $user2 = $row_ref->[2]; + my $host1 = $row_ref->[3]; + my $authmech1 = $row_ref->[4]; + my $password1 = $row_ref->[5]; + my $exclude = $row_ref->[6]; + my $port1 = $row_ref->[7]; + my $enc1 = $row_ref->[8]; + my $delete2duplicates = $row_ref->[9]; + my $maxage = $row_ref->[10]; + my $subfolder2 = $row_ref->[11]; + my $delete1 = $row_ref->[12]; + my $delete2 = $row_ref->[13]; + my $automap = $row_ref->[14]; + my $skipcrossduplicates = $row_ref->[15]; + my $maxbytespersecond = $row_ref->[16]; + my $custom_params = $row_ref->[17]; + my $subscribeall = $row_ref->[18]; + my $timeout1 = $row_ref->[19]; + my $timeout2 = $row_ref->[20]; + my $dry = $row_ref->[21]; if ($enc1 eq "TLS") { $enc1 = "--tls1"; } elsif ($enc1 eq "SSL") { $enc1 = "--ssl1"; } else { undef $enc1; } - my $template = $run_dir . '/imapsync.XXXXXXX'; + my $template = "/tmp/imapsync.XXXXXXX"; my $passfile1 = File::Temp->new(TEMPLATE => $template); my $passfile2 = File::Temp->new(TEMPLATE => $template); - + binmode( $passfile1, ":utf8" ); - + print $passfile1 "$password1\n"; print $passfile2 trim($master_pass) . "\n"; @@ -157,40 +109,130 @@ while ($row = $sth->fetchrow_arrayref()) { '--noreleasecheck']; try { - $is_running = $dbh->prepare("UPDATE imapsync SET is_running = 1, success = NULL, exit_status = NULL WHERE id = ?"); - $is_running->bind_param( 1, ${id} ); + my $is_running = $dbh->prepare("UPDATE imapsync SET is_running = 1, success = NULL, exit_status = NULL WHERE id = ?"); + $is_running->bind_param( 1, $id ); $is_running->execute(); run [@$generated_cmds, @$custom_params_ref], '&>', \my $stdout; - # check exit code and status - ($exit_code, $exit_status) = ($stdout =~ m/Exiting\swith\sreturn\svalue\s(\d+)\s\(([^:)]+)/); + my ($exit_code, $exit_status) = ($stdout =~ m/Exiting\swith\sreturn\svalue\s(\d+)\s\(([^:)]+)/); - $success = 0; + my $success = 0; if (defined $exit_code && $exit_code == 0) { $success = 1; } - $update = $dbh->prepare("UPDATE imapsync SET returned_text = ?, success = ?, exit_status = ? WHERE id = ?"); - $update->bind_param( 1, ${stdout} ); - $update->bind_param( 2, ${success} ); - $update->bind_param( 3, ${exit_status} ); - $update->bind_param( 4, ${id} ); + my $update = $dbh->prepare("UPDATE imapsync SET returned_text = ?, success = ?, exit_status = ? WHERE id = ?"); + $update->bind_param( 1, $stdout ); + $update->bind_param( 2, $success ); + $update->bind_param( 3, $exit_status ); + $update->bind_param( 4, $id ); $update->execute(); } catch { - $update = $dbh->prepare("UPDATE imapsync SET returned_text = 'Could not start or finish imapsync', success = 0 WHERE id = ?"); - $update->bind_param( 1, ${id} ); + my $update = $dbh->prepare("UPDATE imapsync SET returned_text = 'Could not start or finish imapsync', success = 0 WHERE id = ?"); + $update->bind_param( 1, $id ); $update->execute(); } finally { - $update = $dbh->prepare("UPDATE imapsync SET last_run = NOW(), is_running = 0 WHERE id = ?"); - $update->bind_param( 1, ${id} ); + my $update = $dbh->prepare("UPDATE imapsync SET last_run = NOW(), is_running = 0 WHERE id = ?"); + $update->bind_param( 1, $id ); $update->execute(); }; - - } +my $run_dir = "/tmp"; +my $dsn = 'DBI:mysql:database=' . $ENV{'DBNAME'} . ';mysql_socket=/var/run/mysqld/mysqld.sock'; +my $lock_file = $run_dir . "/imapsync_busy"; +my $lockmgr = LockFile::Simple->make(-autoclean => 1, -max => 1); +$lockmgr->lock($lock_file) || die "can't lock ${lock_file}"; + +my $max_parallel = 1; +try { + my $redis = Redis->new( + server => 'redis-mailcow:6379', + password => $ENV{'REDISPASS'}, + reconnect => 10, + every => 1_000_000, + ); + my $val = $redis->get('SYNCJOBS_MAX_PARALLEL'); + if (defined $val && $val =~ /^\d+$/) { + $max_parallel = int($val); + } + $redis->quit(); +} catch { + warn "Could not read SYNCJOBS_MAX_PARALLEL from Redis, defaulting to 1: $_"; +}; +$max_parallel = 1 if $max_parallel < 1; +$max_parallel = 50 if $max_parallel > 50; + +my $dbh = DBI->connect($dsn, $ENV{'DBUSER'}, $ENV{'DBPASS'}, { + mysql_auto_reconnect => 1, + mysql_enable_utf8mb4 => 1 +}); +$dbh->do("UPDATE imapsync SET is_running = 0"); + +open my $file, '<', "/etc/sogo/sieve.creds"; +my $creds = <$file>; +close $file; +my ($master_user, $master_pass) = split /:/, $creds; + +my $sth = $dbh->prepare("SELECT id, + user1, + user2, + host1, + authmech1, + password1, + exclude, + port1, + enc1, + delete2duplicates, + maxage, + subfolder2, + delete1, + delete2, + automap, + skipcrossduplicates, + maxbytespersecond, + custom_params, + subscribeall, + timeout1, + timeout2, + dry + FROM imapsync + WHERE active = 1 + AND is_running = 0 + AND ( + UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(last_run) > mins_interval * 60 + OR + last_run IS NULL) + ORDER BY last_run"); + +$sth->execute(); +my @jobs; +while (my $row = $sth->fetchrow_arrayref()) { + push @jobs, [ @$row ]; +} $sth->finish(); $dbh->disconnect(); +$pm = Parallel::ForkManager->new($max_parallel); + +JOB: +foreach my $job (@jobs) { + my $pid = $pm->start; + if ($pid) { + next JOB; + } + + my $child_dbh = DBI->connect($dsn, $ENV{'DBUSER'}, $ENV{'DBPASS'}, { + mysql_auto_reconnect => 1, + mysql_enable_utf8mb4 => 1 + }); + run_one_job($child_dbh, $job, $master_user, $master_pass); + $child_dbh->disconnect(); + + $pm->finish; +} + +$pm->wait_all_children; + $lockmgr->unlock($lock_file); diff --git a/data/web/admin/system.php b/data/web/admin/system.php index 4db40c753..5ea53b38a 100644 --- a/data/web/admin/system.php +++ b/data/web/admin/system.php @@ -106,6 +106,7 @@ $template_data = [ 'f2b_data' => $f2b_data, 'f2b_banlist_url' => getBaseUrl() . "/f2b-banlist?id=" . $f2b_data['banlist_id'], 'q_data' => quarantine('settings'), + 'sj_data' => mailbox('get', 'syncjob_settings'), 'qn_data' => quota_notification('get'), 'pw_reset_data' => reset_password('get_notification'), 'rsettings_map' => file_get_contents('http://nginx:8081/settings.php'), diff --git a/data/web/api/openapi.yaml b/data/web/api/openapi.yaml index cf54f6add..43a3c8a00 100644 --- a/data/web/api/openapi.yaml +++ b/data/web/api/openapi.yaml @@ -3931,6 +3931,64 @@ paths: type: object type: object summary: Update sync job + /api/v1/edit/syncjob_settings: + post: + responses: + "401": + $ref: "#/components/responses/Unauthorized" + "200": + content: + application/json: + examples: + response: + value: + log: + - entity + - action + - object + msg: + - message + - entity name + type: success + schema: + properties: + log: + description: contains request object + items: {} + type: array + msg: + items: {} + type: array + type: + enum: + - success + - danger + - error + type: string + type: object + description: OK + headers: {} + tags: + - Sync jobs + description: >- + Update the global sync job settings. Currently exposes the maximum + number of imapsync processes that are allowed to run in parallel. + Admin access required. + operationId: Update sync job settings + requestBody: + content: + application/json: + schema: + example: + max_parallel: 4 + properties: + max_parallel: + description: >- + Maximum number of imapsync processes allowed to run in + parallel (1 = sequential behavior, max 50) + type: integer + type: object + summary: Update sync job settings /api/v1/edit/user-acl: post: responses: @@ -5629,6 +5687,36 @@ paths: description: You can list all syn jobs existing in system. operationId: Get sync jobs summary: Get sync jobs + /api/v1/get/syncjob_settings: + get: + responses: + "401": + $ref: "#/components/responses/Unauthorized" + "200": + content: + application/json: + examples: + response: + value: + max_parallel: 4 + schema: + properties: + max_parallel: + description: >- + Maximum number of imapsync processes allowed to run in + parallel (1 = sequential behavior) + type: integer + type: object + description: OK + headers: {} + tags: + - Sync jobs + description: >- + Return the global sync job settings. Currently exposes the maximum + number of imapsync processes allowed to run in parallel. Admin access + required. + operationId: Get sync job settings + summary: Get sync job settings "/api/v1/get/tls-policy-map/{id}": get: parameters: diff --git a/data/web/inc/functions.mailbox.inc.php b/data/web/inc/functions.mailbox.inc.php index adb330ea8..1073ec84d 100644 --- a/data/web/inc/functions.mailbox.inc.php +++ b/data/web/inc/functions.mailbox.inc.php @@ -2265,6 +2265,35 @@ function mailbox($_action, $_type, $_data = null, $_extra = null) { ); } break; + case 'syncjob_settings': + if (!isset($_SESSION['mailcow_cc_role']) || $_SESSION['mailcow_cc_role'] != 'admin') { + $_SESSION['return'][] = array( + 'type' => 'danger', + 'log' => array(__FUNCTION__, $_action, $_type, $_data_log, $_attr), + 'msg' => 'access_denied' + ); + return false; + } + $max_parallel = intval($_data['max_parallel']); + if ($max_parallel < 1) { $max_parallel = 1; } + if ($max_parallel > 50) { $max_parallel = 50; } + try { + $redis->Set('SYNCJOBS_MAX_PARALLEL', $max_parallel); + } + catch (RedisException $e) { + $_SESSION['return'][] = array( + 'type' => 'danger', + 'log' => array(__FUNCTION__, $_action, $_type, $_data_log, $_attr), + 'msg' => array('redis_error', $e->getMessage()) + ); + return false; + } + $_SESSION['return'][] = array( + 'type' => 'success', + 'log' => array(__FUNCTION__, $_action, $_type, $_data_log, $_attr), + 'msg' => 'syncjob_settings_saved' + ); + break; case 'syncjob': if (!is_array($_data['id'])) { $ids = array(); @@ -4619,6 +4648,20 @@ function mailbox($_action, $_type, $_data = null, $_extra = null) { } return $syncjobdetails; break; + case 'syncjob_settings': + if (!isset($_SESSION['mailcow_cc_role']) || $_SESSION['mailcow_cc_role'] != 'admin') { + return false; + } + $settings = array(); + try { + $max_parallel = $redis->Get('SYNCJOBS_MAX_PARALLEL'); + } + catch (RedisException $e) { + $max_parallel = null; + } + $settings['max_parallel'] = intval($max_parallel) ?: 1; + return $settings; + break; case 'syncjobs': $syncjobdata = array(); if (isset($_data) && filter_var($_data, FILTER_VALIDATE_EMAIL)) { diff --git a/data/web/json_api.php b/data/web/json_api.php index 2d315a0b1..687cbe078 100644 --- a/data/web/json_api.php +++ b/data/web/json_api.php @@ -1104,6 +1104,10 @@ if (isset($_GET['query'])) { break; } break; + case "syncjob_settings": + $data = mailbox('get', 'syncjob_settings'); + process_get_return($data); + break; case "active-user-sieve": if (isset($object)) { $sieve_filter = mailbox('get', 'active_user_sieve', $object); @@ -1970,6 +1974,9 @@ if (isset($_GET['query'])) { case "syncjob": process_edit_return(mailbox('edit', 'syncjob', array_merge(array('id' => $items), $attr))); break; + case "syncjob_settings": + process_edit_return(mailbox('edit', 'syncjob_settings', $attr)); + break; case "filter": process_edit_return(mailbox('edit', 'filter', array_merge(array('id' => $items), $attr))); break; diff --git a/data/web/lang/lang.de-de.json b/data/web/lang/lang.de-de.json index 3672762df..122531ad1 100644 --- a/data/web/lang/lang.de-de.json +++ b/data/web/lang/lang.de-de.json @@ -372,6 +372,8 @@ "spamfilter": "Spamfilter", "subject": "Betreff", "success": "Erfolg", + "syncjobs": "Sync-Jobs", + "syncjobs_max_parallel": "Maximale Anzahl paralleler Sync-Jobs
Wie viele imapsync-Prozesse dürfen gleichzeitig laufen. 1 = sequentiell (aktuelles Verhalten).", "sys_mails": "System-E-Mails", "task": "Aufgabe", "text": "Text", @@ -1200,6 +1202,7 @@ "template_modified": "Änderungen am Template %s wurden gespeichert", "template_removed": "Template ID %s wurde gelöscht", "sogo_profile_reset": "ActiveSync-Gerät des Benutzers %s wurde zurückgesetzt", + "syncjob_settings_saved": "Sync-Job-Einstellungen wurden gespeichert", "tls_policy_map_entry_deleted": "TLS-Richtlinie mit der ID %s wurde gelöscht", "tls_policy_map_entry_saved": "TLS-Richtlinieneintrag \"%s\" wurde gespeichert", "ui_texts": "Änderungen an UI-Texten", diff --git a/data/web/lang/lang.en-gb.json b/data/web/lang/lang.en-gb.json index e786bcbe5..eaa478a60 100644 --- a/data/web/lang/lang.en-gb.json +++ b/data/web/lang/lang.en-gb.json @@ -382,6 +382,8 @@ "spamfilter": "Spam filter", "subject": "Subject", "success": "Success", + "syncjobs": "Sync jobs", + "syncjobs_max_parallel": "Maximum parallel sync jobs
How many imapsync processes are allowed to run in parallel. 1 = sequential (legacy behavior).", "sys_mails": "System mails", "task": "Task", "text": "Text", @@ -1204,6 +1206,7 @@ "settings_map_added": "Added settings map entry", "settings_map_removed": "Removed settings map ID %s", "sogo_profile_reset": "SOGo profile for user %s was reset", + "syncjob_settings_saved": "Sync job settings have been saved", "template_added": "Added template %s", "template_modified": "Changes to template %s have been saved", "template_removed": "Template ID %s has been deleted", diff --git a/data/web/templates/admin.twig b/data/web/templates/admin.twig index 651107ccf..62033b9f3 100644 --- a/data/web/templates/admin.twig +++ b/data/web/templates/admin.twig @@ -21,6 +21,7 @@
  • +
  • @@ -50,6 +51,7 @@ {% include 'admin/tab-config-fwdhosts.twig' %} {% include 'admin/tab-config-f2b.twig' %} {% include 'admin/tab-config-quarantine.twig' %} + {% include 'admin/tab-config-syncjobs.twig' %} {% include 'admin/tab-config-quota.twig' %} {% include 'admin/tab-config-rsettings.twig' %} {% include 'admin/tab-config-customize.twig' %} diff --git a/data/web/templates/admin/tab-config-syncjobs.twig b/data/web/templates/admin/tab-config-syncjobs.twig new file mode 100644 index 000000000..1e2c6e064 --- /dev/null +++ b/data/web/templates/admin/tab-config-syncjobs.twig @@ -0,0 +1,21 @@ +
    +
    +
    + + {{ lang.admin.syncjobs }} +
    +
    +
    +
    + +
    + +
    +
    + +
    +
    +
    +
    diff --git a/docker-compose.yml b/docker-compose.yml index 0a9578de6..8faa60823 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -252,7 +252,7 @@ services: - sogo dovecot-mailcow: - image: ghcr.io/mailcow/dovecot:2.3.21.1-2 + image: ghcr.io/mailcow/dovecot:2.3.21.1-3 depends_on: - mysql-mailcow - netfilter-mailcow